diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 3fe0c68986b..97bb4c56175 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; +import static java.lang.Math.addExact; + /** * Contains logic for computing the fair shares. A {@link Schedulable}'s fair * share is {@link Resource} it is entitled to, independent of the current @@ -31,10 +33,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; * consumption lies at or below its fair share will never have its containers * preempted. */ -public class ComputeFairShares { +public final class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; + private ComputeFairShares() { + } + /** * Compute fair share of the given schedulables.Fair share is an allocation of * shares considering only active schedulables ie schedulables which have @@ -100,19 +105,20 @@ public class ComputeFairShares { * all Schedulables are only given their minShare) and an upper bound computed * to be large enough that too many slots are given (by doubling R until we * use more than totalResources resources). The helper method - * resourceUsedWithWeightToResourceRatio computes the total resources used with a - * given value of R. + * resourceUsedWithWeightToResourceRatio computes the total resources used + * with a given value of R. *

* The running time of this algorithm is linear in the number of Schedulables, - * because resourceUsedWithWeightToResourceRatio is linear-time and the number of - * iterations of binary search is a constant (dependent on desired precision). + * because resourceUsedWithWeightToResourceRatio is linear-time and the + * number of iterations of binary search is a constant (dependent on desired + * precision). */ private static void computeSharesInternal( Collection allSchedulables, Resource totalResources, String type, boolean isSteadyShare) { Collection schedulables = new ArrayList<>(); - int takenResources = handleFixedFairShares( + long takenResources = handleFixedFairShares( allSchedulables, schedulables, isSteadyShare, type); if (schedulables.isEmpty()) { @@ -121,12 +127,11 @@ public class ComputeFairShares { // Find an upper bound on R that we can use in our binary search. We start // at R = 1 and double it until we have either used all the resources or we // have met all Schedulables' max shares. - int totalMaxShare = 0; + long totalMaxShare = 0; for (Schedulable sched : schedulables) { long maxShare = sched.getMaxShare().getResourceValue(type); - totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare, - Integer.MAX_VALUE); - if (totalMaxShare == Integer.MAX_VALUE) { + totalMaxShare = safeAdd(maxShare, totalMaxShare); + if (totalMaxShare == Long.MAX_VALUE) { break; } } @@ -166,23 +171,24 @@ public class ComputeFairShares { target = sched.getFairShare(); } - target.setResourceValue(type, (long)computeShare(sched, right, type)); + target.setResourceValue(type, computeShare(sched, right, type)); } } /** * Compute the resources that would be used given a weight-to-resource ratio - * w2rRatio, for use in the computeFairShares algorithm as described in # + * w2rRatio, for use in the computeFairShares algorithm as described in + * {@link #computeSharesInternal}. */ private static long resourceUsedWithWeightToResourceRatio(double w2rRatio, Collection schedulables, String type) { long resourcesTaken = 0; for (Schedulable sched : schedulables) { long share = computeShare(sched, w2rRatio, type); - if (Long.MAX_VALUE - resourcesTaken < share) { - return Long.MAX_VALUE; + resourcesTaken = safeAdd(resourcesTaken, share); + if (resourcesTaken == Long.MAX_VALUE) { + break; } - resourcesTaken += share; } return resourcesTaken; } @@ -204,11 +210,11 @@ public class ComputeFairShares { * Returns the resources taken by fixed fairshare schedulables, * and adds the remaining to the passed nonFixedSchedulables. */ - private static int handleFixedFairShares( + private static long handleFixedFairShares( Collection schedulables, Collection nonFixedSchedulables, boolean isSteadyShare, String type) { - int totalResource = 0; + long totalResource = 0; for (Schedulable sched : schedulables) { long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type); @@ -224,15 +230,15 @@ public class ComputeFairShares { } target.setResourceValue(type, fixedShare); - totalResource = (int) Math.min((long)totalResource + (long)fixedShare, - Integer.MAX_VALUE); + totalResource = safeAdd(totalResource, fixedShare); } } return totalResource; } /** - * Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise. + * Get the fairshare for the {@link Schedulable} if it is fixed, + * -1 otherwise. * * The fairshare is fixed if either the maxShare is 0, weight is 0, * or the Schedulable is not active for instantaneous fairshare. @@ -259,4 +265,20 @@ public class ComputeFairShares { return -1; } + + /** + * Safely add two long values. The result will always be a valid long value. + * If the addition caused an overflow the return value will be set to + * Long.MAX_VALUE. + * @param a first long to add + * @param b second long to add + * @return result of the addition + */ + private static long safeAdd(long a, long b) { + try { + return addExact(a, b); + } catch (ArithmeticException ae) { + return Long.MAX_VALUE; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index b1fc2d0bd94..c42084e5198 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -70,7 +70,18 @@ public class FakeSchedulable implements Schedulable { weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0); } - + + public FakeSchedulable(long minShare, long maxShare) { + this(minShare, maxShare, 1L); + } + + public FakeSchedulable(long minShare, long maxShare, float weights) { + this(Resources.createResource(minShare, 0), + Resources.createResource(maxShare, 0), + weights, Resources.createResource(0, 0), + Resources.createResource(0, 0), 0); + } + public FakeSchedulable(Resource minShare, Resource maxShare, float weight, Resource fairShare, Resource usage, long startTime) { this.minShare = minShare; @@ -146,8 +157,8 @@ public class FakeSchedulable implements Schedulable { return true; } - public void setResourceUsage(Resource usage) { - this.usage = usage; + public void setResourceUsage(Resource resourceUsage) { + this.usage = resourceUsage; } public final void start(long time) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index b1666d61a59..5d3d49ab822 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -19,10 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.junit.Assert; @@ -40,7 +38,7 @@ public class TestComputeFairShares { @Before public void setUp() throws Exception { - scheds = new ArrayList(); + scheds = new ArrayList<>(); } /** @@ -149,22 +147,72 @@ public class TestComputeFairShares { } /** - * Test that shares are computed accurately even when the number of slots is - * very large. + * Test that shares are computed accurately even when the number of + * resources is very large. + * Test adapted to accommodate long values for resources. */ @Test public void testLargeShares() { - int million = 1000 * 1000; - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); - scheds.add(new FakeSchedulable()); + long giga = 1000L * 1000L * 1000L * 4L; + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, giga)); ComputeFairShares.computeShares(scheds, - Resources.createResource(40 * million), + Resources.createResource(4 * giga), ResourceInformation.MEMORY_MB.getName()); - verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million); + verifyMemoryShares(giga, giga, giga, giga); } - + + /** + * Test overflow in the resources taken and upper bound. + */ + @Test + public void testLargeMinimums() { + long giga = 1000L * 1000L * 1000L * 4L; + scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE)); + scheds.add(new FakeSchedulable(giga, giga)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(4 * giga), + ResourceInformation.MEMORY_MB.getName()); + verifyMemoryShares(Long.MAX_VALUE, giga); + } + + /** + * Test overflow in the upper bound calculation for the binary search. + */ + @Test + public void testOverflowMaxShare() { + long giga = 1000L * 1000L * 1000L; + scheds.add(new FakeSchedulable(0L, giga)); + scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(2 * giga), + ResourceInformation.MEMORY_MB.getName()); + verifyMemoryShares(giga, giga); + } + + /** + * Test overflow in the fixed share calculations. The 3th schedulable should + * not get any share as all resources are taken by the handleFixedShare() + * call. + * With the overflow it looked like there were more resources available then + * there really are. + * The values in the test might not be "real" but they show the overflow. + */ + @Test + public void testOverflowFixedShare() { + long giga = 1000L * 1000L * 1000L; + long minValue = Long.MAX_VALUE - 1L; + scheds.add(new FakeSchedulable(giga, giga, 0)); + scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0)); + scheds.add(new FakeSchedulable(0L, giga)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(1000L), + ResourceInformation.MEMORY_MB.getName()); + verifyMemoryShares(giga, minValue, 0); + } + /** * Test that being called on an empty list doesn't confuse the algorithm. */ @@ -176,7 +224,7 @@ public class TestComputeFairShares { } /** - * Test that CPU works as well as memory + * Test that CPU works as well as memory. */ @Test public void testCPU() { @@ -192,10 +240,12 @@ public class TestComputeFairShares { /** * Check that a given list of shares have been assigned to this.scheds. */ - private void verifyMemoryShares(int... shares) { - Assert.assertEquals(scheds.size(), shares.length); + private void verifyMemoryShares(long... shares) { + Assert.assertEquals("Number of shares and schedulables are not consistent", + scheds.size(), shares.length); for (int i = 0; i < shares.length; i++) { - Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize()); + Assert.assertEquals("Expected share number " + i + " in list wrong", + shares[i], scheds.get(i).getFairShare().getMemorySize()); } } @@ -203,23 +253,11 @@ public class TestComputeFairShares { * Check that a given list of shares have been assigned to this.scheds. */ private void verifyCPUShares(int... shares) { - Assert.assertEquals(scheds.size(), shares.length); + Assert.assertEquals("Number of shares and schedulables are not consistent", + scheds.size(), shares.length); for (int i = 0; i < shares.length; i++) { - Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores()); + Assert.assertEquals("Expected share number " + i + " in list wrong", + shares[i], scheds.get(i).getFairShare().getVirtualCores()); } } - - /** - * Test computeShares will not enter into infinite loop. - */ - @Test(timeout = 10000) - public void testResourceUsedWithWeightToResourceRatio() { - Collection schedulables = new ArrayList<>(); - schedulables.add(new FakeSchedulable(Integer.MAX_VALUE)); - schedulables.add(new FakeSchedulable(Integer.MAX_VALUE)); - - Resource totalResource = Resource.newInstance(Integer.MAX_VALUE, 0); - ComputeFairShares.computeShares( - schedulables, totalResource, ResourceInformation.MEMORY_URI); - } }