YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda)

(cherry picked from commit 32c91223f1)
This commit is contained in:
Wangda Tan 2016-01-29 10:53:31 +08:00 committed by Jonathan Hung
parent afcf9c55b5
commit be2afb4a5d
1 changed files with 275 additions and 109 deletions

View File

@ -22,25 +22,31 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.HashSet;
import java.util.Set;
/** /**
* A {@link ResourceCalculator} which uses the concept of * A {@link ResourceCalculator} which uses the concept of
* <em>dominant resource</em> to compare multi-dimensional resources. * <em>dominant resource</em> to compare multi-dimensional resources.
* *
* Essentially the idea is that the in a multi-resource environment, * Essentially the idea is that the in a multi-resource environment,
* the resource allocation should be determined by the dominant share * the resource allocation should be determined by the dominant share
* of an entity (user or queue), which is the maximum share that the * of an entity (user or queue), which is the maximum share that the
* entity has been allocated of any resource. * entity has been allocated of any resource.
* *
* In a nutshell, it seeks to maximize the minimum dominant share across * In a nutshell, it seeks to maximize the minimum dominant share across
* all entities. * all entities.
* *
* For example, if user A runs CPU-heavy tasks and user B runs * For example, if user A runs CPU-heavy tasks and user B runs
* memory-heavy tasks, it attempts to equalize CPU share of user A * memory-heavy tasks, it attempts to equalize CPU share of user A
* with Memory-share of user B. * with Memory-share of user B.
* *
* In the single resource case, it reduces to max-min fairness for that resource. * In the single resource case, it reduces to max-min fairness for that resource.
* *
* See the Dominant Resource Fairness paper for more details: * See the Dominant Resource Fairness paper for more details:
* www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf * www.cs.berkeley.edu/~matei/papers/2011/nsdi_drf.pdf
*/ */
@ -50,6 +56,56 @@ public class DominantResourceCalculator extends ResourceCalculator {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(DominantResourceCalculator.class); LogFactory.getLog(DominantResourceCalculator.class);
private Set<String> resourceNames;
public DominantResourceCalculator() {
resourceNames = new HashSet<>();
resourceNames.add(ResourceInformation.MEMORY.getName());
resourceNames.add(ResourceInformation.VCORES.getName());
}
/**
* Compare two resources - if the value for every resource type for the lhs
* is greater than that of the rhs, return 1. If the value for every resource
* type in the lhs is less than the rhs, return -1. Otherwise, return 0
*
* @param lhs resource to be compared
* @param rhs resource to be compared
* @return 0, 1, or -1
*/
private int compare(Resource lhs, Resource rhs) {
boolean lhsGreater = false;
boolean rhsGreater = false;
int ret = 0;
for (String rName : resourceNames) {
try {
ResourceInformation lhsResourceInformation =
lhs.getResourceInformation(rName);
ResourceInformation rhsResourceInformation =
rhs.getResourceInformation(rName);
int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
if (diff >= 1) {
lhsGreater = true;
} else if (diff <= -1) {
rhsGreater = true;
}
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + rName, ye);
}
}
if (lhsGreater && rhsGreater) {
ret = 0;
} else if (lhsGreater) {
ret = 1;
} else if (rhsGreater) {
ret = -1;
}
return ret;
}
@Override @Override
public int compare(Resource clusterResource, Resource lhs, Resource rhs, public int compare(Resource clusterResource, Resource lhs, Resource rhs,
boolean singleType) { boolean singleType) {
@ -57,25 +113,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
if (lhs.equals(rhs)) { if (lhs.equals(rhs)) {
return 0; return 0;
} }
if (isInvalidDivisor(clusterResource)) { if (isInvalidDivisor(clusterResource)) {
if ((lhs.getMemorySize() < rhs.getMemorySize() && return this.compare(lhs, rhs);
lhs.getVirtualCores() > rhs.getVirtualCores()) ||
(lhs.getMemorySize() > rhs.getMemorySize() &&
lhs.getVirtualCores() < rhs.getVirtualCores())) {
return 0;
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
return 1;
} else if (lhs.getMemorySize() < rhs.getMemorySize()
|| lhs.getVirtualCores() < rhs.getVirtualCores()) {
return -1;
}
} }
float l = getResourceAsValue(clusterResource, lhs, true); float l = getResourceAsValue(clusterResource, lhs, true);
float r = getResourceAsValue(clusterResource, rhs, true); float r = getResourceAsValue(clusterResource, rhs, true);
if (l < r) { if (l < r) {
return -1; return -1;
} else if (l > r) { } else if (l > r) {
@ -83,75 +128,142 @@ public class DominantResourceCalculator extends ResourceCalculator {
} else if (!singleType) { } else if (!singleType) {
l = getResourceAsValue(clusterResource, lhs, false); l = getResourceAsValue(clusterResource, lhs, false);
r = getResourceAsValue(clusterResource, rhs, false); r = getResourceAsValue(clusterResource, rhs, false);
if (l < r) { if (l < r) {
return -1; return -1;
} else if (l > r) { } else if (l > r) {
return 1; return 1;
} }
} }
return 0; return 0;
} }
/** /**
* Use 'dominant' for now since we only have 2 resources - gives us a slight * Use 'dominant' for now since we only have 2 resources - gives us a slight
* performance boost. * performance boost.
* * <p></p>
* Once we add more resources, we'll need a more complicated (and slightly * Once we add more resources, we'll need a more complicated (and slightly
* less performant algorithm). * less performant algorithm).
*/ */
protected float getResourceAsValue( protected float getResourceAsValue(Resource clusterResource,
Resource clusterResource, Resource resource, boolean dominant) { Resource resource, boolean dominant) {
// Just use 'dominant' resource
return (dominant) ? float min = Float.MAX_VALUE;
Math.max( float max = 0.0f;
(float)resource.getMemorySize() / clusterResource.getMemorySize(), for (String rName : resourceNames) {
(float)resource.getVirtualCores() / clusterResource.getVirtualCores() try {
) ResourceInformation clusterResourceResourceInformation =
: clusterResource.getResourceInformation(rName);
Math.min( ResourceInformation resourceInformation =
(float)resource.getMemorySize() / clusterResource.getMemorySize(), resource.getResourceInformation(rName);
(float)resource.getVirtualCores() / clusterResource.getVirtualCores() Long resourceValue = UnitsConversionUtil
); .convert(resourceInformation.getUnits(),
} clusterResourceResourceInformation.getUnits(),
resourceInformation.getValue());
@Override float tmp =
public long computeAvailableContainers(Resource available, Resource required) { (float) resourceValue / (float) clusterResourceResourceInformation
return Math.min( .getValue();
available.getMemorySize() / required.getMemorySize(), min = min < tmp ? min : tmp;
available.getVirtualCores() / required.getVirtualCores()); max = max > tmp ? max : tmp;
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return (dominant) ? max : min;
} }
@Override @Override
public float divide(Resource clusterResource, public long computeAvailableContainers(Resource available, Resource required) {
long min = Long.MAX_VALUE;
for (String resource : resourceNames) {
try {
ResourceInformation availableResource =
available.getResourceInformation(resource);
ResourceInformation requiredResource =
required.getResourceInformation(resource);
Long requiredResourceValue = UnitsConversionUtil
.convert(requiredResource.getUnits(), availableResource.getUnits(),
requiredResource.getValue());
Long tmp = availableResource.getValue() / requiredResourceValue;
min = min < tmp ? min : tmp;
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min;
}
@Override
public float divide(Resource clusterResource,
Resource numerator, Resource denominator) { Resource numerator, Resource denominator) {
return return
getResourceAsValue(clusterResource, numerator, true) / getResourceAsValue(clusterResource, numerator, true) /
getResourceAsValue(clusterResource, denominator, true); getResourceAsValue(clusterResource, denominator, true);
} }
@Override @Override
public boolean isInvalidDivisor(Resource r) { public boolean isInvalidDivisor(Resource r) {
if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) { for (String resource : resourceNames) {
return true; try {
if (r.getResourceValue(resource).equals(0L)) {
return true;
}
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource value for " + resource, ye);
}
} }
return false; return false;
} }
@Override @Override
public float ratio(Resource a, Resource b) { public float ratio(Resource a, Resource b) {
return Math.max( float ratio = 0.0f;
(float)a.getMemorySize()/b.getMemorySize(), for (String resource : resourceNames) {
(float)a.getVirtualCores()/b.getVirtualCores() try {
); ResourceInformation aResourceInformation =
a.getResourceInformation(resource);
ResourceInformation bResourceInformation =
b.getResourceInformation(resource);
Long bResourceValue = UnitsConversionUtil
.convert(bResourceInformation.getUnits(),
aResourceInformation.getUnits(),
bResourceInformation.getValue());
float tmp =
(float) aResourceInformation.getValue() / (float) bResourceValue;
ratio = ratio > tmp ? ratio : tmp;
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return ratio;
} }
@Override @Override
public Resource divideAndCeil(Resource numerator, int denominator) { public Resource divideAndCeil(Resource numerator, int denominator) {
return Resources.createResource( return divideAndCeil(numerator, (long) denominator);
divideAndCeil(numerator.getMemorySize(), denominator), }
divideAndCeil(numerator.getVirtualCores(), denominator)
); public Resource divideAndCeil(Resource numerator, long denominator) {
Resource ret = Resources.createResource(0, 0);
for (String resource : resourceNames) {
try {
ResourceInformation resourceInformation = ResourceInformation
.newInstance(numerator.getResourceInformation(resource));
resourceInformation.setValue(
divideAndCeil(resourceInformation.getValue(), denominator));
ret.setResourceInformation(resource, resourceInformation);
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return ret;
} }
@Override @Override
@ -164,73 +276,127 @@ public class DominantResourceCalculator extends ResourceCalculator {
@Override @Override
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource, Resource stepFactor) { Resource maximumResource, Resource stepFactor) {
if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) { Resource ret = Resources.createResource(0, 0);
Resource step = Resources.clone(stepFactor); for (String resource : resourceNames) {
if (stepFactor.getMemorySize() == 0) { try {
LOG.error("Memory cannot be allocated in increments of zero. Assuming " ResourceInformation rResourceInformation =
+ minimumResource.getMemorySize() + "MB increment size. " r.getResourceInformation(resource);
+ "Please ensure the scheduler configuration is correct."); ResourceInformation minimumResourceInformation =
step.setMemorySize(minimumResource.getMemorySize()); minimumResource.getResourceInformation(resource);
} ResourceInformation maximumResourceInformation =
maximumResource.getResourceInformation(resource);
ResourceInformation stepFactorResourceInformation =
stepFactor.getResourceInformation(resource);
ResourceInformation tmp =
ResourceInformation.newInstance(rResourceInformation);
if (stepFactor.getVirtualCores() == 0) { Long rValue = rResourceInformation.getValue();
LOG.error("VCore cannot be allocated in increments of zero. Assuming " Long minimumValue = UnitsConversionUtil
+ minimumResource.getVirtualCores() + "VCores increment size. " .convert(minimumResourceInformation.getUnits(),
+ "Please ensure the scheduler configuration is correct."); rResourceInformation.getUnits(),
step.setVirtualCores(minimumResource.getVirtualCores()); minimumResourceInformation.getValue());
} Long maximumValue = UnitsConversionUtil
.convert(maximumResourceInformation.getUnits(),
rResourceInformation.getUnits(),
maximumResourceInformation.getValue());
Long stepFactorValue = UnitsConversionUtil
.convert(stepFactorResourceInformation.getUnits(),
rResourceInformation.getUnits(),
stepFactorResourceInformation.getValue());
stepFactor = step; tmp.setValue(
Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue),
maximumValue));
ret.setResourceInformation(resource, tmp);
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
} }
return ret;
long normalizedMemory = Math.min(
roundUp(
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
stepFactor.getMemorySize()),
maximumResource.getMemorySize());
int normalizedCores = Math.min(
roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
stepFactor.getVirtualCores()),
maximumResource.getVirtualCores());
return Resources.createResource(normalizedMemory,
normalizedCores);
} }
@Override @Override
public Resource roundUp(Resource r, Resource stepFactor) { public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource( return this.rounding(r, stepFactor, true);
roundUp(r.getMemorySize(), stepFactor.getMemorySize()),
roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
);
} }
@Override @Override
public Resource roundDown(Resource r, Resource stepFactor) { public Resource roundDown(Resource r, Resource stepFactor) {
return Resources.createResource( return this.rounding(r, stepFactor, false);
roundDown(r.getMemorySize(), stepFactor.getMemorySize()), }
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
); private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
Resource ret = Resources.createResource(0, 0);
for (String resource : resourceNames) {
try {
ResourceInformation rResourceInformation =
r.getResourceInformation(resource);
ResourceInformation stepFactorResourceInformation =
stepFactor.getResourceInformation(resource);
ResourceInformation tmp =
ResourceInformation.newInstance(rResourceInformation);
Long rValue = rResourceInformation.getValue();
Long stepFactorValue = UnitsConversionUtil
.convert(stepFactorResourceInformation.getUnits(),
rResourceInformation.getUnits(),
stepFactorResourceInformation.getValue());
Long value = roundUp ? roundUp(rValue, stepFactorValue) :
roundDown(rValue, stepFactorValue);
tmp.setValue(value);
ret.setResourceInformation(resource, tmp);
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return ret;
} }
@Override @Override
public Resource multiplyAndNormalizeUp(Resource r, double by, public Resource multiplyAndNormalizeUp(Resource r, double by,
Resource stepFactor) { Resource stepFactor) {
return Resources.createResource( return this.multiplyAndNormalize(r, by, stepFactor, true);
roundUp((long) Math.ceil((float) (r.getMemorySize() * by)),
stepFactor.getMemorySize()),
roundUp((int) Math.ceil((float) (r.getVirtualCores() * by)),
stepFactor.getVirtualCores()));
} }
@Override @Override
public Resource multiplyAndNormalizeDown(Resource r, double by, public Resource multiplyAndNormalizeDown(Resource r, double by,
Resource stepFactor) { Resource stepFactor) {
return Resources.createResource( return this.multiplyAndNormalize(r, by, stepFactor, false);
roundDown((long) (r.getMemorySize() * by), stepFactor.getMemorySize()), }
roundDown((int) (r.getVirtualCores() * by),
stepFactor.getVirtualCores())); private Resource multiplyAndNormalize(Resource r, double by,
Resource stepFactor, boolean roundUp) {
Resource ret = Resources.createResource(0, 0);
for (String resource : resourceNames) {
try {
ResourceInformation rResourceInformation =
r.getResourceInformation(resource);
ResourceInformation stepFactorResourceInformation =
stepFactor.getResourceInformation(resource);
ResourceInformation tmp =
ResourceInformation.newInstance(rResourceInformation);
Long rValue = rResourceInformation.getValue();
Long stepFactorValue = UnitsConversionUtil
.convert(stepFactorResourceInformation.getUnits(),
rResourceInformation.getUnits(),
stepFactorResourceInformation.getValue());
Long value =
roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) :
roundDown((long) (rValue * by), stepFactorValue);
tmp.setValue(value);
ret.setResourceInformation(resource, tmp);
} catch (YarnException ye) {
throw new IllegalArgumentException(
"Error getting resource information for " + resource, ye);
}
}
return ret;
} }
@Override @Override