From be2afb4a5dbddf5d4f06e5242a51a3b86c847070 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 29 Jan 2016 10:53:31 +0800 Subject: [PATCH] YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda) (cherry picked from commit 32c91223f1bd06561ea4ce2d1944e8d9a847f18c) --- .../resource/DominantResourceCalculator.java | 384 +++++++++++++----- 1 file changed, 275 insertions(+), 109 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 7697e1dfc33..a94e7a51ae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -22,25 +22,31 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.UnitsConversionUtil; + +import java.util.HashSet; +import java.util.Set; /** - * A {@link ResourceCalculator} which uses the concept of + * A {@link ResourceCalculator} which uses the concept of * dominant resource to compare multi-dimensional resources. * - * Essentially the idea is that the in a multi-resource environment, - * the resource allocation should be determined by the dominant share - * of an entity (user or queue), which is the maximum share that the - * entity has been allocated of any resource. - * - * In a nutshell, it seeks to maximize the minimum dominant share across - * all entities. - * + * Essentially the idea is that the in a multi-resource environment, + * the resource allocation should be determined by the dominant share + * of an entity (user or queue), which is the maximum share that the + * entity has been allocated of any resource. + * + * In a nutshell, it seeks to maximize the minimum dominant share across + * all entities. + * * For example, if user A runs CPU-heavy tasks and user B runs - * memory-heavy tasks, it attempts to equalize CPU share of user A - * with Memory-share of user B. - * + * memory-heavy tasks, it attempts to equalize CPU share of user A + * with Memory-share of user B. + * * In the single resource case, it reduces to max-min fairness for that resource. - * + * * See the Dominant Resource Fairness paper for more details: * www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf */ @@ -50,6 +56,56 @@ public class DominantResourceCalculator extends ResourceCalculator { private static final Log LOG = LogFactory.getLog(DominantResourceCalculator.class); + + private Set resourceNames; + + public DominantResourceCalculator() { + resourceNames = new HashSet<>(); + resourceNames.add(ResourceInformation.MEMORY.getName()); + resourceNames.add(ResourceInformation.VCORES.getName()); + } + + /** + * Compare two resources - if the value for every resource type for the lhs + * is greater than that of the rhs, return 1. If the value for every resource + * type in the lhs is less than the rhs, return -1. Otherwise, return 0 + * + * @param lhs resource to be compared + * @param rhs resource to be compared + * @return 0, 1, or -1 + */ + private int compare(Resource lhs, Resource rhs) { + boolean lhsGreater = false; + boolean rhsGreater = false; + int ret = 0; + + for (String rName : resourceNames) { + try { + ResourceInformation lhsResourceInformation = + lhs.getResourceInformation(rName); + ResourceInformation rhsResourceInformation = + rhs.getResourceInformation(rName); + int diff = lhsResourceInformation.compareTo(rhsResourceInformation); + if (diff >= 1) { + lhsGreater = true; + } else if (diff <= -1) { + rhsGreater = true; + } + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + rName, ye); + } + } + if (lhsGreater && rhsGreater) { + ret = 0; + } else if (lhsGreater) { + ret = 1; + } else if (rhsGreater) { + ret = -1; + } + return ret; + } + @Override public int compare(Resource clusterResource, Resource lhs, Resource rhs, boolean singleType) { @@ -57,25 +113,14 @@ public class DominantResourceCalculator extends ResourceCalculator { if (lhs.equals(rhs)) { return 0; } - + if (isInvalidDivisor(clusterResource)) { - if ((lhs.getMemorySize() < rhs.getMemorySize() && - lhs.getVirtualCores() > rhs.getVirtualCores()) || - (lhs.getMemorySize() > rhs.getMemorySize() && - lhs.getVirtualCores() < rhs.getVirtualCores())) { - return 0; - } else if (lhs.getMemorySize() > rhs.getMemorySize() - || lhs.getVirtualCores() > rhs.getVirtualCores()) { - return 1; - } else if (lhs.getMemorySize() < rhs.getMemorySize() - || lhs.getVirtualCores() < rhs.getVirtualCores()) { - return -1; - } + return this.compare(lhs, rhs); } float l = getResourceAsValue(clusterResource, lhs, true); float r = getResourceAsValue(clusterResource, rhs, true); - + if (l < r) { return -1; } else if (l > r) { @@ -83,75 +128,142 @@ public class DominantResourceCalculator extends ResourceCalculator { } else if (!singleType) { l = getResourceAsValue(clusterResource, lhs, false); r = getResourceAsValue(clusterResource, rhs, false); + if (l < r) { return -1; } else if (l > r) { return 1; } } - + return 0; } /** * Use 'dominant' for now since we only have 2 resources - gives us a slight * performance boost. - * + *

* Once we add more resources, we'll need a more complicated (and slightly * less performant algorithm). */ - protected float getResourceAsValue( - Resource clusterResource, Resource resource, boolean dominant) { - // Just use 'dominant' resource - return (dominant) ? - Math.max( - (float)resource.getMemorySize() / clusterResource.getMemorySize(), - (float)resource.getVirtualCores() / clusterResource.getVirtualCores() - ) - : - Math.min( - (float)resource.getMemorySize() / clusterResource.getMemorySize(), - (float)resource.getVirtualCores() / clusterResource.getVirtualCores() - ); - } - - @Override - public long computeAvailableContainers(Resource available, Resource required) { - return Math.min( - available.getMemorySize() / required.getMemorySize(), - available.getVirtualCores() / required.getVirtualCores()); + protected float getResourceAsValue(Resource clusterResource, + Resource resource, boolean dominant) { + + float min = Float.MAX_VALUE; + float max = 0.0f; + for (String rName : resourceNames) { + try { + ResourceInformation clusterResourceResourceInformation = + clusterResource.getResourceInformation(rName); + ResourceInformation resourceInformation = + resource.getResourceInformation(rName); + Long resourceValue = UnitsConversionUtil + .convert(resourceInformation.getUnits(), + clusterResourceResourceInformation.getUnits(), + resourceInformation.getValue()); + float tmp = + (float) resourceValue / (float) clusterResourceResourceInformation + .getValue(); + min = min < tmp ? min : tmp; + max = max > tmp ? max : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return (dominant) ? max : min; } @Override - public float divide(Resource clusterResource, + public long computeAvailableContainers(Resource available, Resource required) { + long min = Long.MAX_VALUE; + for (String resource : resourceNames) { + try { + ResourceInformation availableResource = + available.getResourceInformation(resource); + ResourceInformation requiredResource = + required.getResourceInformation(resource); + Long requiredResourceValue = UnitsConversionUtil + .convert(requiredResource.getUnits(), availableResource.getUnits(), + requiredResource.getValue()); + Long tmp = availableResource.getValue() / requiredResourceValue; + min = min < tmp ? min : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + + } + return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min; + } + + @Override + public float divide(Resource clusterResource, Resource numerator, Resource denominator) { - return - getResourceAsValue(clusterResource, numerator, true) / + return + getResourceAsValue(clusterResource, numerator, true) / getResourceAsValue(clusterResource, denominator, true); } - + @Override public boolean isInvalidDivisor(Resource r) { - if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) { - return true; + for (String resource : resourceNames) { + try { + if (r.getResourceValue(resource).equals(0L)) { + return true; + } + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource value for " + resource, ye); + } } return false; } @Override public float ratio(Resource a, Resource b) { - return Math.max( - (float)a.getMemorySize()/b.getMemorySize(), - (float)a.getVirtualCores()/b.getVirtualCores() - ); + float ratio = 0.0f; + for (String resource : resourceNames) { + try { + ResourceInformation aResourceInformation = + a.getResourceInformation(resource); + ResourceInformation bResourceInformation = + b.getResourceInformation(resource); + Long bResourceValue = UnitsConversionUtil + .convert(bResourceInformation.getUnits(), + aResourceInformation.getUnits(), + bResourceInformation.getValue()); + float tmp = + (float) aResourceInformation.getValue() / (float) bResourceValue; + ratio = ratio > tmp ? ratio : tmp; + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ratio; } @Override public Resource divideAndCeil(Resource numerator, int denominator) { - return Resources.createResource( - divideAndCeil(numerator.getMemorySize(), denominator), - divideAndCeil(numerator.getVirtualCores(), denominator) - ); + return divideAndCeil(numerator, (long) denominator); + } + + public Resource divideAndCeil(Resource numerator, long denominator) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation resourceInformation = ResourceInformation + .newInstance(numerator.getResourceInformation(resource)); + resourceInformation.setValue( + divideAndCeil(resourceInformation.getValue(), denominator)); + ret.setResourceInformation(resource, resourceInformation); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override @@ -164,73 +276,127 @@ public class DominantResourceCalculator extends ResourceCalculator { @Override public Resource normalize(Resource r, Resource minimumResource, - Resource maximumResource, Resource stepFactor) { - if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) { - Resource step = Resources.clone(stepFactor); - if (stepFactor.getMemorySize() == 0) { - LOG.error("Memory cannot be allocated in increments of zero. Assuming " - + minimumResource.getMemorySize() + "MB increment size. " - + "Please ensure the scheduler configuration is correct."); - step.setMemorySize(minimumResource.getMemorySize()); - } + Resource maximumResource, Resource stepFactor) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation minimumResourceInformation = + minimumResource.getResourceInformation(resource); + ResourceInformation maximumResourceInformation = + maximumResource.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); - if (stepFactor.getVirtualCores() == 0) { - LOG.error("VCore cannot be allocated in increments of zero. Assuming " - + minimumResource.getVirtualCores() + "VCores increment size. " - + "Please ensure the scheduler configuration is correct."); - step.setVirtualCores(minimumResource.getVirtualCores()); - } + Long rValue = rResourceInformation.getValue(); + Long minimumValue = UnitsConversionUtil + .convert(minimumResourceInformation.getUnits(), + rResourceInformation.getUnits(), + minimumResourceInformation.getValue()); + Long maximumValue = UnitsConversionUtil + .convert(maximumResourceInformation.getUnits(), + rResourceInformation.getUnits(), + maximumResourceInformation.getValue()); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); - stepFactor = step; + tmp.setValue( + Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue), + maximumValue)); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } } - - long normalizedMemory = Math.min( - roundUp( - Math.max(r.getMemorySize(), minimumResource.getMemorySize()), - stepFactor.getMemorySize()), - maximumResource.getMemorySize()); - int normalizedCores = Math.min( - roundUp( - Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), - stepFactor.getVirtualCores()), - maximumResource.getVirtualCores()); - return Resources.createResource(normalizedMemory, - normalizedCores); + return ret; } @Override public Resource roundUp(Resource r, Resource stepFactor) { - return Resources.createResource( - roundUp(r.getMemorySize(), stepFactor.getMemorySize()), - roundUp(r.getVirtualCores(), stepFactor.getVirtualCores()) - ); + return this.rounding(r, stepFactor, true); } @Override public Resource roundDown(Resource r, Resource stepFactor) { - return Resources.createResource( - roundDown(r.getMemorySize(), stepFactor.getMemorySize()), - roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()) - ); + return this.rounding(r, stepFactor, false); + } + + private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); + + Long rValue = rResourceInformation.getValue(); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); + + Long value = roundUp ? roundUp(rValue, stepFactorValue) : + roundDown(rValue, stepFactorValue); + tmp.setValue(value); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override public Resource multiplyAndNormalizeUp(Resource r, double by, Resource stepFactor) { - return Resources.createResource( - roundUp((long) Math.ceil((float) (r.getMemorySize() * by)), - stepFactor.getMemorySize()), - roundUp((int) Math.ceil((float) (r.getVirtualCores() * by)), - stepFactor.getVirtualCores())); + return this.multiplyAndNormalize(r, by, stepFactor, true); } @Override public Resource multiplyAndNormalizeDown(Resource r, double by, Resource stepFactor) { - return Resources.createResource( - roundDown((long) (r.getMemorySize() * by), stepFactor.getMemorySize()), - roundDown((int) (r.getVirtualCores() * by), - stepFactor.getVirtualCores())); + return this.multiplyAndNormalize(r, by, stepFactor, false); + } + + private Resource multiplyAndNormalize(Resource r, double by, + Resource stepFactor, boolean roundUp) { + Resource ret = Resources.createResource(0, 0); + for (String resource : resourceNames) { + try { + ResourceInformation rResourceInformation = + r.getResourceInformation(resource); + ResourceInformation stepFactorResourceInformation = + stepFactor.getResourceInformation(resource); + ResourceInformation tmp = + ResourceInformation.newInstance(rResourceInformation); + + Long rValue = rResourceInformation.getValue(); + Long stepFactorValue = UnitsConversionUtil + .convert(stepFactorResourceInformation.getUnits(), + rResourceInformation.getUnits(), + stepFactorResourceInformation.getValue()); + + Long value = + roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) : + roundDown((long) (rValue * by), stepFactorValue); + tmp.setValue(value); + ret.setResourceInformation(resource, tmp); + } catch (YarnException ye) { + throw new IllegalArgumentException( + "Error getting resource information for " + resource, ye); + } + } + return ret; } @Override