YARN-9360. Do not expose innards of QueueMetrics object into FSLeafQueue#computeMaxAMResource. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2019-07-15 10:47:10 +02:00
parent 30a8f840f1
commit 91ce09e706
2 changed files with 38 additions and 29 deletions

View File

@ -833,7 +833,37 @@ public long getAggregatePreemptedContainers() {
return aggregateContainersPreempted.value();
}
public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
return queueMetricsForCustomResources;
/**
* Fills in Resource values from available metrics values of custom resources
* to @code{targetResource}, only if the corresponding
* value of @code{targetResource} is zero.
* If @code{fromResource} has a value less than the available metrics value
* for a particular resource, it will be set to the @code{targetResource}
* instead.
*
* @param fromResource The resource to compare available resource values with.
* @param targetResource The resource to save the values into.
*/
public void fillInValuesFromAvailableResources(Resource fromResource,
Resource targetResource) {
if (queueMetricsForCustomResources != null) {
QueueMetricsCustomResource availableResources =
queueMetricsForCustomResources.getAvailable();
// We expect all custom resources contained in availableResources,
// so we will loop through all of them.
for (Map.Entry<String, Long> availableEntry : availableResources
.getValues().entrySet()) {
String resourceName = availableEntry.getKey();
// We only update the value if fairshare is 0 for that resource.
if (targetResource.getResourceValue(resourceName) == 0) {
Long availableValue = availableEntry.getValue();
long value = Math.min(availableValue,
fromResource.getResourceValue(resourceName));
targetResource.setResourceValue(resourceName, value);
}
}
}
}
}

View File

@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -43,8 +42,6 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -507,40 +504,22 @@ public ActiveUsersManager getAbstractUsersManager() {
*/
private Resource computeMaxAMResource() {
Resource maxResource = Resources.clone(getFairShare());
Resource maxShare = getMaxShare();
if (maxResource.getMemorySize() == 0) {
maxResource.setMemorySize(
Math.min(scheduler.getRootQueueMetrics().getAvailableMB(),
getMaxShare().getMemorySize()));
maxShare.getMemorySize()));
}
if (maxResource.getVirtualCores() == 0) {
maxResource.setVirtualCores(Math.min(
scheduler.getRootQueueMetrics().getAvailableVirtualCores(),
getMaxShare().getVirtualCores()));
maxShare.getVirtualCores()));
}
QueueMetricsForCustomResources metricsForCustomResources =
scheduler.getRootQueueMetrics().getQueueMetricsForCustomResources();
if (metricsForCustomResources != null) {
QueueMetricsCustomResource availableResources =
metricsForCustomResources.getAvailable();
// We expect all custom resources contained in availableResources,
// so we will loop through all of them.
for (Map.Entry<String, Long> availableEntry : availableResources
.getValues().entrySet()) {
String resourceName = availableEntry.getKey();
// We only update the value if fairshare is 0 for that resource.
if (maxResource.getResourceValue(resourceName) == 0) {
Long availableValue = availableEntry.getValue();
long value = Math.min(availableValue,
getMaxShare().getResourceValue(resourceName));
maxResource.setResourceValue(resourceName, value);
}
}
}
scheduler.getRootQueueMetrics()
.fillInValuesFromAvailableResources(maxShare, maxResource);
// Round up to allow AM to run when there is only one vcore on the cluster
return Resources.multiplyAndRoundUp(maxResource, maxAMShare);