YARN-4172. Extend DominantResourceCalculator to account for all resources. (Varun Vasudev via wangda)
(cherry picked from commit 32c91223f1
)
This commit is contained in:
parent
68da5218b7
commit
33fabe1e10
|
@ -22,6 +22,12 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A {@link ResourceCalculator} which uses the concept of
|
||||
|
@ -50,6 +56,56 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
private static final Log LOG =
|
||||
LogFactory.getLog(DominantResourceCalculator.class);
|
||||
|
||||
|
||||
private Set<String> resourceNames;
|
||||
|
||||
public DominantResourceCalculator() {
|
||||
resourceNames = new HashSet<>();
|
||||
resourceNames.add(ResourceInformation.MEMORY.getName());
|
||||
resourceNames.add(ResourceInformation.VCORES.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare two resources - if the value for every resource type for the lhs
|
||||
* is greater than that of the rhs, return 1. If the value for every resource
|
||||
* type in the lhs is less than the rhs, return -1. Otherwise, return 0
|
||||
*
|
||||
* @param lhs resource to be compared
|
||||
* @param rhs resource to be compared
|
||||
* @return 0, 1, or -1
|
||||
*/
|
||||
private int compare(Resource lhs, Resource rhs) {
|
||||
boolean lhsGreater = false;
|
||||
boolean rhsGreater = false;
|
||||
int ret = 0;
|
||||
|
||||
for (String rName : resourceNames) {
|
||||
try {
|
||||
ResourceInformation lhsResourceInformation =
|
||||
lhs.getResourceInformation(rName);
|
||||
ResourceInformation rhsResourceInformation =
|
||||
rhs.getResourceInformation(rName);
|
||||
int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
|
||||
if (diff >= 1) {
|
||||
lhsGreater = true;
|
||||
} else if (diff <= -1) {
|
||||
rhsGreater = true;
|
||||
}
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + rName, ye);
|
||||
}
|
||||
}
|
||||
if (lhsGreater && rhsGreater) {
|
||||
ret = 0;
|
||||
} else if (lhsGreater) {
|
||||
ret = 1;
|
||||
} else if (rhsGreater) {
|
||||
ret = -1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(Resource clusterResource, Resource lhs, Resource rhs,
|
||||
boolean singleType) {
|
||||
|
@ -59,18 +115,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
}
|
||||
|
||||
if (isInvalidDivisor(clusterResource)) {
|
||||
if ((lhs.getMemorySize() < rhs.getMemorySize() &&
|
||||
lhs.getVirtualCores() > rhs.getVirtualCores()) ||
|
||||
(lhs.getMemorySize() > rhs.getMemorySize() &&
|
||||
lhs.getVirtualCores() < rhs.getVirtualCores())) {
|
||||
return 0;
|
||||
} else if (lhs.getMemorySize() > rhs.getMemorySize()
|
||||
|| lhs.getVirtualCores() > rhs.getVirtualCores()) {
|
||||
return 1;
|
||||
} else if (lhs.getMemorySize() < rhs.getMemorySize()
|
||||
|| lhs.getVirtualCores() < rhs.getVirtualCores()) {
|
||||
return -1;
|
||||
}
|
||||
return this.compare(lhs, rhs);
|
||||
}
|
||||
|
||||
float l = getResourceAsValue(clusterResource, lhs, true);
|
||||
|
@ -83,6 +128,7 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
} else if (!singleType) {
|
||||
l = getResourceAsValue(clusterResource, lhs, false);
|
||||
r = getResourceAsValue(clusterResource, rhs, false);
|
||||
|
||||
if (l < r) {
|
||||
return -1;
|
||||
} else if (l > r) {
|
||||
|
@ -96,30 +142,59 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
/**
|
||||
* Use 'dominant' for now since we only have 2 resources - gives us a slight
|
||||
* performance boost.
|
||||
*
|
||||
* <p></p>
|
||||
* Once we add more resources, we'll need a more complicated (and slightly
|
||||
* less performant algorithm).
|
||||
*/
|
||||
protected float getResourceAsValue(
|
||||
Resource clusterResource, Resource resource, boolean dominant) {
|
||||
// Just use 'dominant' resource
|
||||
return (dominant) ?
|
||||
Math.max(
|
||||
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
|
||||
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
|
||||
)
|
||||
:
|
||||
Math.min(
|
||||
(float)resource.getMemorySize() / clusterResource.getMemorySize(),
|
||||
(float)resource.getVirtualCores() / clusterResource.getVirtualCores()
|
||||
);
|
||||
protected float getResourceAsValue(Resource clusterResource,
|
||||
Resource resource, boolean dominant) {
|
||||
|
||||
float min = Float.MAX_VALUE;
|
||||
float max = 0.0f;
|
||||
for (String rName : resourceNames) {
|
||||
try {
|
||||
ResourceInformation clusterResourceResourceInformation =
|
||||
clusterResource.getResourceInformation(rName);
|
||||
ResourceInformation resourceInformation =
|
||||
resource.getResourceInformation(rName);
|
||||
Long resourceValue = UnitsConversionUtil
|
||||
.convert(resourceInformation.getUnits(),
|
||||
clusterResourceResourceInformation.getUnits(),
|
||||
resourceInformation.getValue());
|
||||
float tmp =
|
||||
(float) resourceValue / (float) clusterResourceResourceInformation
|
||||
.getValue();
|
||||
min = min < tmp ? min : tmp;
|
||||
max = max > tmp ? max : tmp;
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return (dominant) ? max : min;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long computeAvailableContainers(Resource available, Resource required) {
|
||||
return Math.min(
|
||||
available.getMemorySize() / required.getMemorySize(),
|
||||
available.getVirtualCores() / required.getVirtualCores());
|
||||
long min = Long.MAX_VALUE;
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation availableResource =
|
||||
available.getResourceInformation(resource);
|
||||
ResourceInformation requiredResource =
|
||||
required.getResourceInformation(resource);
|
||||
Long requiredResourceValue = UnitsConversionUtil
|
||||
.convert(requiredResource.getUnits(), availableResource.getUnits(),
|
||||
requiredResource.getValue());
|
||||
Long tmp = availableResource.getValue() / requiredResourceValue;
|
||||
min = min < tmp ? min : tmp;
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
|
||||
}
|
||||
return min > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) min;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -132,26 +207,63 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
|
||||
@Override
|
||||
public boolean isInvalidDivisor(Resource r) {
|
||||
if (r.getMemorySize() == 0.0f || r.getVirtualCores() == 0.0f) {
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
if (r.getResourceValue(resource).equals(0L)) {
|
||||
return true;
|
||||
}
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource value for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float ratio(Resource a, Resource b) {
|
||||
return Math.max(
|
||||
(float)a.getMemorySize()/b.getMemorySize(),
|
||||
(float)a.getVirtualCores()/b.getVirtualCores()
|
||||
);
|
||||
float ratio = 0.0f;
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation aResourceInformation =
|
||||
a.getResourceInformation(resource);
|
||||
ResourceInformation bResourceInformation =
|
||||
b.getResourceInformation(resource);
|
||||
Long bResourceValue = UnitsConversionUtil
|
||||
.convert(bResourceInformation.getUnits(),
|
||||
aResourceInformation.getUnits(),
|
||||
bResourceInformation.getValue());
|
||||
float tmp =
|
||||
(float) aResourceInformation.getValue() / (float) bResourceValue;
|
||||
ratio = ratio > tmp ? ratio : tmp;
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return ratio;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource divideAndCeil(Resource numerator, int denominator) {
|
||||
return Resources.createResource(
|
||||
divideAndCeil(numerator.getMemorySize(), denominator),
|
||||
divideAndCeil(numerator.getVirtualCores(), denominator)
|
||||
);
|
||||
return divideAndCeil(numerator, (long) denominator);
|
||||
}
|
||||
|
||||
public Resource divideAndCeil(Resource numerator, long denominator) {
|
||||
Resource ret = Resources.createResource(0, 0);
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation resourceInformation = ResourceInformation
|
||||
.newInstance(numerator.getResourceInformation(resource));
|
||||
resourceInformation.setValue(
|
||||
divideAndCeil(resourceInformation.getValue(), denominator));
|
||||
ret.setResourceInformation(resource, resourceInformation);
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,72 +277,126 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {
|
||||
Resource step = Resources.clone(stepFactor);
|
||||
if (stepFactor.getMemorySize() == 0) {
|
||||
LOG.error("Memory cannot be allocated in increments of zero. Assuming "
|
||||
+ minimumResource.getMemorySize() + "MB increment size. "
|
||||
+ "Please ensure the scheduler configuration is correct.");
|
||||
step.setMemorySize(minimumResource.getMemorySize());
|
||||
}
|
||||
Resource ret = Resources.createResource(0, 0);
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation rResourceInformation =
|
||||
r.getResourceInformation(resource);
|
||||
ResourceInformation minimumResourceInformation =
|
||||
minimumResource.getResourceInformation(resource);
|
||||
ResourceInformation maximumResourceInformation =
|
||||
maximumResource.getResourceInformation(resource);
|
||||
ResourceInformation stepFactorResourceInformation =
|
||||
stepFactor.getResourceInformation(resource);
|
||||
ResourceInformation tmp =
|
||||
ResourceInformation.newInstance(rResourceInformation);
|
||||
|
||||
if (stepFactor.getVirtualCores() == 0) {
|
||||
LOG.error("VCore cannot be allocated in increments of zero. Assuming "
|
||||
+ minimumResource.getVirtualCores() + "VCores increment size. "
|
||||
+ "Please ensure the scheduler configuration is correct.");
|
||||
step.setVirtualCores(minimumResource.getVirtualCores());
|
||||
}
|
||||
Long rValue = rResourceInformation.getValue();
|
||||
Long minimumValue = UnitsConversionUtil
|
||||
.convert(minimumResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
minimumResourceInformation.getValue());
|
||||
Long maximumValue = UnitsConversionUtil
|
||||
.convert(maximumResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
maximumResourceInformation.getValue());
|
||||
Long stepFactorValue = UnitsConversionUtil
|
||||
.convert(stepFactorResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
stepFactorResourceInformation.getValue());
|
||||
|
||||
stepFactor = step;
|
||||
tmp.setValue(
|
||||
Math.min(roundUp(Math.max(rValue, minimumValue), stepFactorValue),
|
||||
maximumValue));
|
||||
ret.setResourceInformation(resource, tmp);
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
|
||||
long normalizedMemory = Math.min(
|
||||
roundUp(
|
||||
Math.max(r.getMemorySize(), minimumResource.getMemorySize()),
|
||||
stepFactor.getMemorySize()),
|
||||
maximumResource.getMemorySize());
|
||||
int normalizedCores = Math.min(
|
||||
roundUp(
|
||||
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
|
||||
stepFactor.getVirtualCores()),
|
||||
maximumResource.getVirtualCores());
|
||||
return Resources.createResource(normalizedMemory,
|
||||
normalizedCores);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource roundUp(Resource r, Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
roundUp(r.getMemorySize(), stepFactor.getMemorySize()),
|
||||
roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
|
||||
);
|
||||
return this.rounding(r, stepFactor, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource roundDown(Resource r, Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
|
||||
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores())
|
||||
);
|
||||
return this.rounding(r, stepFactor, false);
|
||||
}
|
||||
|
||||
private Resource rounding(Resource r, Resource stepFactor, boolean roundUp) {
|
||||
Resource ret = Resources.createResource(0, 0);
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation rResourceInformation =
|
||||
r.getResourceInformation(resource);
|
||||
ResourceInformation stepFactorResourceInformation =
|
||||
stepFactor.getResourceInformation(resource);
|
||||
ResourceInformation tmp =
|
||||
ResourceInformation.newInstance(rResourceInformation);
|
||||
|
||||
Long rValue = rResourceInformation.getValue();
|
||||
Long stepFactorValue = UnitsConversionUtil
|
||||
.convert(stepFactorResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
stepFactorResourceInformation.getValue());
|
||||
|
||||
Long value = roundUp ? roundUp(rValue, stepFactorValue) :
|
||||
roundDown(rValue, stepFactorValue);
|
||||
tmp.setValue(value);
|
||||
ret.setResourceInformation(resource, tmp);
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeUp(Resource r, double by,
|
||||
Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
roundUp((long) Math.ceil((float) (r.getMemorySize() * by)),
|
||||
stepFactor.getMemorySize()),
|
||||
roundUp((int) Math.ceil((float) (r.getVirtualCores() * by)),
|
||||
stepFactor.getVirtualCores()));
|
||||
return this.multiplyAndNormalize(r, by, stepFactor, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource multiplyAndNormalizeDown(Resource r, double by,
|
||||
Resource stepFactor) {
|
||||
return Resources.createResource(
|
||||
roundDown((long) (r.getMemorySize() * by), stepFactor.getMemorySize()),
|
||||
roundDown((int) (r.getVirtualCores() * by),
|
||||
stepFactor.getVirtualCores()));
|
||||
return this.multiplyAndNormalize(r, by, stepFactor, false);
|
||||
}
|
||||
|
||||
private Resource multiplyAndNormalize(Resource r, double by,
|
||||
Resource stepFactor, boolean roundUp) {
|
||||
Resource ret = Resources.createResource(0, 0);
|
||||
for (String resource : resourceNames) {
|
||||
try {
|
||||
ResourceInformation rResourceInformation =
|
||||
r.getResourceInformation(resource);
|
||||
ResourceInformation stepFactorResourceInformation =
|
||||
stepFactor.getResourceInformation(resource);
|
||||
ResourceInformation tmp =
|
||||
ResourceInformation.newInstance(rResourceInformation);
|
||||
|
||||
Long rValue = rResourceInformation.getValue();
|
||||
Long stepFactorValue = UnitsConversionUtil
|
||||
.convert(stepFactorResourceInformation.getUnits(),
|
||||
rResourceInformation.getUnits(),
|
||||
stepFactorResourceInformation.getValue());
|
||||
|
||||
Long value =
|
||||
roundUp ? roundUp((long) Math.ceil(rValue * by), stepFactorValue) :
|
||||
roundDown((long) (rValue * by), stepFactorValue);
|
||||
tmp.setValue(value);
|
||||
ret.setResourceInformation(resource, tmp);
|
||||
} catch (YarnException ye) {
|
||||
throw new IllegalArgumentException(
|
||||
"Error getting resource information for " + resource, ye);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue