YARN-8833. Avoid potential integer overflow when computing fair shares. Contributed by liyakun.
This commit is contained in:
parent
2a416fe9f2
commit
c54a689d96
@ -32,10 +32,13 @@
|
|||||||
* consumption lies at or below its fair share will never have its containers
|
* consumption lies at or below its fair share will never have its containers
|
||||||
* preempted.
|
* preempted.
|
||||||
*/
|
*/
|
||||||
public class ComputeFairShares {
|
public final class ComputeFairShares {
|
||||||
|
|
||||||
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
||||||
|
|
||||||
|
private ComputeFairShares() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute fair share of the given schedulables.Fair share is an allocation of
|
* Compute fair share of the given schedulables.Fair share is an allocation of
|
||||||
* shares considering only active schedulables ie schedulables which have
|
* shares considering only active schedulables ie schedulables which have
|
||||||
@ -101,19 +104,20 @@ public static void computeSteadyShares(
|
|||||||
* all Schedulables are only given their minShare) and an upper bound computed
|
* all Schedulables are only given their minShare) and an upper bound computed
|
||||||
* to be large enough that too many slots are given (by doubling R until we
|
* to be large enough that too many slots are given (by doubling R until we
|
||||||
* use more than totalResources resources). The helper method
|
* use more than totalResources resources). The helper method
|
||||||
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
|
* resourceUsedWithWeightToResourceRatio computes the total resources used
|
||||||
* given value of R.
|
* with a given value of R.
|
||||||
* <p>
|
* <p>
|
||||||
* The running time of this algorithm is linear in the number of Schedulables,
|
* The running time of this algorithm is linear in the number of Schedulables,
|
||||||
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
|
* because resourceUsedWithWeightToResourceRatio is linear-time and the
|
||||||
* iterations of binary search is a constant (dependent on desired precision).
|
* number of iterations of binary search is a constant (dependent on desired
|
||||||
|
* precision).
|
||||||
*/
|
*/
|
||||||
private static void computeSharesInternal(
|
private static void computeSharesInternal(
|
||||||
Collection<? extends Schedulable> allSchedulables,
|
Collection<? extends Schedulable> allSchedulables,
|
||||||
Resource totalResources, ResourceType type, boolean isSteadyShare) {
|
Resource totalResources, ResourceType type, boolean isSteadyShare) {
|
||||||
|
|
||||||
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
|
Collection<Schedulable> schedulables = new ArrayList<Schedulable>();
|
||||||
int takenResources = handleFixedFairShares(
|
long takenResources = handleFixedFairShares(
|
||||||
allSchedulables, schedulables, isSteadyShare, type);
|
allSchedulables, schedulables, isSteadyShare, type);
|
||||||
|
|
||||||
if (schedulables.isEmpty()) {
|
if (schedulables.isEmpty()) {
|
||||||
@ -122,12 +126,11 @@ private static void computeSharesInternal(
|
|||||||
// Find an upper bound on R that we can use in our binary search. We start
|
// Find an upper bound on R that we can use in our binary search. We start
|
||||||
// at R = 1 and double it until we have either used all the resources or we
|
// at R = 1 and double it until we have either used all the resources or we
|
||||||
// have met all Schedulables' max shares.
|
// have met all Schedulables' max shares.
|
||||||
int totalMaxShare = 0;
|
long totalMaxShare = 0;
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
long maxShare = getResourceValue(sched.getMaxShare(), type);
|
long maxShare = getResourceValue(sched.getMaxShare(), type);
|
||||||
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
|
totalMaxShare = safeAdd(maxShare, totalMaxShare);
|
||||||
Integer.MAX_VALUE);
|
if (totalMaxShare == Long.MAX_VALUE) {
|
||||||
if (totalMaxShare == Integer.MAX_VALUE) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,7 +149,7 @@ private static void computeSharesInternal(
|
|||||||
double right = rMax;
|
double right = rMax;
|
||||||
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
|
||||||
double mid = (left + right) / 2.0;
|
double mid = (left + right) / 2.0;
|
||||||
int plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
|
long plannedResourceUsed = resourceUsedWithWeightToResourceRatio(
|
||||||
mid, schedulables, type);
|
mid, schedulables, type);
|
||||||
if (plannedResourceUsed == totalResource) {
|
if (plannedResourceUsed == totalResource) {
|
||||||
right = mid;
|
right = mid;
|
||||||
@ -171,14 +174,18 @@ private static void computeSharesInternal(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Compute the resources that would be used given a weight-to-resource ratio
|
* Compute the resources that would be used given a weight-to-resource ratio
|
||||||
* w2rRatio, for use in the computeFairShares algorithm as described in #
|
* w2rRatio, for use in the computeFairShares algorithm as described in
|
||||||
|
* {@link #computeSharesInternal}.
|
||||||
*/
|
*/
|
||||||
private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
||||||
Collection<? extends Schedulable> schedulables, ResourceType type) {
|
Collection<? extends Schedulable> schedulables, ResourceType type) {
|
||||||
int resourcesTaken = 0;
|
long resourcesTaken = 0;
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
int share = computeShare(sched, w2rRatio, type);
|
long share = computeShare(sched, w2rRatio, type);
|
||||||
resourcesTaken += share;
|
resourcesTaken = safeAdd(resourcesTaken, share);
|
||||||
|
if (resourcesTaken == Long.MAX_VALUE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return resourcesTaken;
|
return resourcesTaken;
|
||||||
}
|
}
|
||||||
@ -187,12 +194,12 @@ private static int resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
|||||||
* Compute the resources assigned to a Schedulable given a particular
|
* Compute the resources assigned to a Schedulable given a particular
|
||||||
* weight-to-resource ratio w2rRatio.
|
* weight-to-resource ratio w2rRatio.
|
||||||
*/
|
*/
|
||||||
private static int computeShare(Schedulable sched, double w2rRatio,
|
private static long computeShare(Schedulable sched, double w2rRatio,
|
||||||
ResourceType type) {
|
ResourceType type) {
|
||||||
double share = sched.getWeights().getWeight(type) * w2rRatio;
|
double share = sched.getWeights().getWeight(type) * w2rRatio;
|
||||||
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
|
share = Math.max(share, getResourceValue(sched.getMinShare(), type));
|
||||||
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
|
share = Math.min(share, getResourceValue(sched.getMaxShare(), type));
|
||||||
return (int) share;
|
return (long) share;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -200,11 +207,11 @@ private static int computeShare(Schedulable sched, double w2rRatio,
|
|||||||
* Returns the resources taken by fixed fairshare schedulables,
|
* Returns the resources taken by fixed fairshare schedulables,
|
||||||
* and adds the remaining to the passed nonFixedSchedulables.
|
* and adds the remaining to the passed nonFixedSchedulables.
|
||||||
*/
|
*/
|
||||||
private static int handleFixedFairShares(
|
private static long handleFixedFairShares(
|
||||||
Collection<? extends Schedulable> schedulables,
|
Collection<? extends Schedulable> schedulables,
|
||||||
Collection<Schedulable> nonFixedSchedulables,
|
Collection<Schedulable> nonFixedSchedulables,
|
||||||
boolean isSteadyShare, ResourceType type) {
|
boolean isSteadyShare, ResourceType type) {
|
||||||
int totalResource = 0;
|
long totalResource = 0;
|
||||||
|
|
||||||
for (Schedulable sched : schedulables) {
|
for (Schedulable sched : schedulables) {
|
||||||
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
||||||
@ -216,15 +223,15 @@ private static int handleFixedFairShares(
|
|||||||
? ((FSQueue)sched).getSteadyFairShare()
|
? ((FSQueue)sched).getSteadyFairShare()
|
||||||
: sched.getFairShare(),
|
: sched.getFairShare(),
|
||||||
type);
|
type);
|
||||||
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
|
totalResource = safeAdd(totalResource, fixedShare);
|
||||||
Integer.MAX_VALUE);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totalResource;
|
return totalResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
|
* Get the fairshare for the {@link Schedulable} if it is fixed,
|
||||||
|
* -1 otherwise.
|
||||||
*
|
*
|
||||||
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
||||||
* or the Schedulable is not active for instantaneous fairshare.
|
* or the Schedulable is not active for instantaneous fairshare.
|
||||||
@ -275,4 +282,21 @@ private static void setResourceValue(long val, Resource resource, ResourceType t
|
|||||||
throw new IllegalArgumentException("Invalid resource");
|
throw new IllegalArgumentException("Invalid resource");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Safely add two long values. The result will always be a valid long value.
|
||||||
|
* If the addition caused an overflow the return value will be set to
|
||||||
|
* <code>Long.MAX_VALUE</code>.
|
||||||
|
* @param a first long to add
|
||||||
|
* @param b second long to add
|
||||||
|
* @return result of the addition
|
||||||
|
*/
|
||||||
|
private static long safeAdd(long a, long b) {
|
||||||
|
long r = a + b;
|
||||||
|
// Overflow iff both arguments have the opposite sign of the result
|
||||||
|
if (((a ^ r) & (b ^ r)) < 0) {
|
||||||
|
r = Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
return r;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -69,6 +69,19 @@ public FakeSchedulable(Resource minShare, ResourceWeights weights) {
|
|||||||
weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0);
|
weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FakeSchedulable(long minShare, long maxShare) {
|
||||||
|
this(minShare, maxShare, 1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FakeSchedulable(long minShare, long maxShare, float weights) {
|
||||||
|
this(Resources.createResource(minShare, 0),
|
||||||
|
Resources.createResource(maxShare, 0),
|
||||||
|
new ResourceWeights(weights),
|
||||||
|
Resources.createResource(0, 0),
|
||||||
|
Resources.createResource(0, 0),
|
||||||
|
0);
|
||||||
|
}
|
||||||
|
|
||||||
public FakeSchedulable(Resource minShare, Resource maxShare,
|
public FakeSchedulable(Resource minShare, Resource maxShare,
|
||||||
ResourceWeights weight, Resource fairShare, Resource usage, long startTime) {
|
ResourceWeights weight, Resource fairShare, Resource usage, long startTime) {
|
||||||
this.minShare = minShare;
|
this.minShare = minShare;
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
@ -38,7 +39,7 @@ public class TestComputeFairShares {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
scheds = new ArrayList<Schedulable>();
|
scheds = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -147,19 +148,69 @@ public void testWeightedSharingWithMinShares() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that shares are computed accurately even when the number of slots is
|
* Test that shares are computed accurately even when the number of
|
||||||
* very large.
|
* resources is very large.
|
||||||
|
* Test adapted to accommodate long values for resources.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLargeShares() {
|
public void testLargeShares() {
|
||||||
int million = 1000 * 1000;
|
long giga = 1000L * 1000L * 1000L * 4L;
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
scheds.add(new FakeSchedulable());
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
ComputeFairShares.computeShares(scheds,
|
ComputeFairShares.computeShares(scheds,
|
||||||
Resources.createResource(40 * million), ResourceType.MEMORY);
|
Resources.createResource(40 * giga), ResourceType.MEMORY);
|
||||||
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
|
verifyMemoryShares(giga, giga, giga, giga);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the resources taken and upper bound.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLargeMinimums() {
|
||||||
|
long giga = 1000L * 1000L * 1000L * 4L;
|
||||||
|
scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE));
|
||||||
|
scheds.add(new FakeSchedulable(giga, giga));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(4 * giga),
|
||||||
|
ResourceType.MEMORY);
|
||||||
|
verifyMemoryShares(Long.MAX_VALUE, giga);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the upper bound calculation for the binary search.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOverflowMaxShare() {
|
||||||
|
long giga = 1000L * 1000L * 1000L;
|
||||||
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
|
scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(2 * giga),
|
||||||
|
ResourceType.MEMORY);
|
||||||
|
verifyMemoryShares(giga, giga);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test overflow in the fixed share calculations. The 3th schedulable should
|
||||||
|
* not get any share as all resources are taken by the handleFixedShare()
|
||||||
|
* call.
|
||||||
|
* With the overflow it looked like there were more resources available then
|
||||||
|
* there really are.
|
||||||
|
* The values in the test might not be "real" but they show the overflow.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOverflowFixedShare() {
|
||||||
|
long giga = 1000L * 1000L * 1000L;
|
||||||
|
long minValue = Long.MAX_VALUE - 1L;
|
||||||
|
scheds.add(new FakeSchedulable(giga, giga, 0));
|
||||||
|
scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0));
|
||||||
|
scheds.add(new FakeSchedulable(0L, giga));
|
||||||
|
ComputeFairShares.computeShares(scheds,
|
||||||
|
Resources.createResource(1000L),
|
||||||
|
ResourceType.MEMORY);
|
||||||
|
verifyMemoryShares(giga, minValue, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -173,7 +224,7 @@ public void testEmptyList() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that CPU works as well as memory
|
* Test that CPU works as well as memory.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCPU() {
|
public void testCPU() {
|
||||||
@ -193,10 +244,12 @@ public void testCPU() {
|
|||||||
/**
|
/**
|
||||||
* Check that a given list of shares have been assigned to this.scheds.
|
* Check that a given list of shares have been assigned to this.scheds.
|
||||||
*/
|
*/
|
||||||
private void verifyMemoryShares(int... shares) {
|
private void verifyMemoryShares(long... shares) {
|
||||||
Assert.assertEquals(scheds.size(), shares.length);
|
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||||
|
scheds.size(), shares.length);
|
||||||
for (int i = 0; i < shares.length; i++) {
|
for (int i = 0; i < shares.length; i++) {
|
||||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize());
|
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||||
|
shares[i], scheds.get(i).getFairShare().getMemorySize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,9 +257,11 @@ private void verifyMemoryShares(int... shares) {
|
|||||||
* Check that a given list of shares have been assigned to this.scheds.
|
* Check that a given list of shares have been assigned to this.scheds.
|
||||||
*/
|
*/
|
||||||
private void verifyCPUShares(int... shares) {
|
private void verifyCPUShares(int... shares) {
|
||||||
Assert.assertEquals(scheds.size(), shares.length);
|
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||||
|
scheds.size(), shares.length);
|
||||||
for (int i = 0; i < shares.length; i++) {
|
for (int i = 0; i < shares.length; i++) {
|
||||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||||
|
shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user