YARN-2162. Add ability in Fair Scheduler to optionally configure maxResources in terms of percentage. (Yufei Gu)

This commit is contained in:
Yufei Gu 2017-10-06 00:24:51 -07:00
parent 3106552b75
commit 9bb9831347
19 changed files with 366 additions and 110 deletions

View File

@ -32,8 +32,6 @@ import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -41,15 +39,13 @@ import com.google.common.annotations.VisibleForTesting;
public class AllocationConfiguration extends ReservationSchedulerConfiguration { public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
private static final ResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator();
// Minimum resource allocation for each queue // Minimum resource allocation for each queue
private final Map<String, Resource> minQueueResources; private final Map<String, Resource> minQueueResources;
// Maximum amount of resources per queue // Maximum amount of resources per queue
@VisibleForTesting @VisibleForTesting
final Map<String, Resource> maxQueueResources; final Map<String, ConfigurableResource> maxQueueResources;
// Maximum amount of resources for each queue's ad hoc children // Maximum amount of resources for each queue's ad hoc children
private final Map<String, Resource> maxChildQueueResources; private final Map<String, ConfigurableResource> maxChildQueueResources;
// Sharing weights for each queue // Sharing weights for each queue
private final Map<String, ResourceWeights> queueWeights; private final Map<String, ResourceWeights> queueWeights;
@ -61,7 +57,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
final Map<String, Integer> userMaxApps; final Map<String, Integer> userMaxApps;
private final int userMaxAppsDefault; private final int userMaxAppsDefault;
private final int queueMaxAppsDefault; private final int queueMaxAppsDefault;
private final Resource queueMaxResourcesDefault; private final ConfigurableResource queueMaxResourcesDefault;
// Maximum resource share for each leaf queue that can be used to run AMs // Maximum resource share for each leaf queue that can be used to run AMs
final Map<String, Float> queueMaxAMShares; final Map<String, Float> queueMaxAMShares;
@ -110,12 +106,12 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
private final Set<String> nonPreemptableQueues; private final Set<String> nonPreemptableQueues;
public AllocationConfiguration(Map<String, Resource> minQueueResources, public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, ConfigurableResource> maxQueueResources,
Map<String, Resource> maxChildQueueResources, Map<String, ConfigurableResource> maxChildQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights, Map<String, ResourceWeights> queueWeights,
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault, Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
int queueMaxAppsDefault, Resource queueMaxResourcesDefault, int queueMaxAppsDefault, ConfigurableResource queueMaxResourcesDefault,
float queueMaxAMShareDefault, float queueMaxAMShareDefault,
Map<String, SchedulingPolicy> schedulingPolicies, Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy, SchedulingPolicy defaultSchedulingPolicy,
@ -164,7 +160,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
queueMaxAMShares = new HashMap<>(); queueMaxAMShares = new HashMap<>();
userMaxAppsDefault = Integer.MAX_VALUE; userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE;
queueMaxResourcesDefault = Resources.unbounded(); queueMaxResourcesDefault = new ConfigurableResource(Resources.unbounded());
queueMaxAMShareDefault = 0.5f; queueMaxAMShareDefault = 0.5f;
queueAcls = new HashMap<>(); queueAcls = new HashMap<>();
resAcls = new HashMap<>(); resAcls = new HashMap<>();
@ -290,26 +286,18 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
/** /**
* Get the maximum resource allocation for the given queue. If the max in not * Get the maximum resource allocation for the given queue. If the max in not
* set, return the larger of the min and the default max. * set, return the default max.
* *
* @param queue the target queue's name * @param queue the target queue's name
* @return the max allocation on this queue * @return the max allocation on this queue
*/ */
@VisibleForTesting @VisibleForTesting
Resource getMaxResources(String queue) { ConfigurableResource getMaxResources(String queue) {
Resource maxQueueResource = maxQueueResources.get(queue); ConfigurableResource maxQueueResource = maxQueueResources.get(queue);
if (maxQueueResource == null) { if (maxQueueResource == null) {
Resource minQueueResource = minQueueResources.get(queue); maxQueueResource = queueMaxResourcesDefault;
if (minQueueResource != null &&
Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
minQueueResource, queueMaxResourcesDefault)) {
return minQueueResource;
} else {
return queueMaxResourcesDefault;
}
} else {
return maxQueueResource;
} }
return maxQueueResource;
} }
/** /**
@ -319,7 +307,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
* @return the max allocation on this queue or null if not set * @return the max allocation on this queue or null if not set
*/ */
@VisibleForTesting @VisibleForTesting
Resource getMaxChildResources(String queue) { ConfigurableResource getMaxChildResources(String queue) {
return maxChildQueueResources.get(queue); return maxChildQueueResources.get(queue);
} }
@ -417,9 +405,9 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
queue.setMaxChildQueueResource(getMaxChildResources(name)); queue.setMaxChildQueueResource(getMaxChildResources(name));
// Set queue metrics. // Set queue metrics.
queue.getMetrics().setMinShare(getMinResources(name)); queue.getMetrics().setMinShare(queue.getMinShare());
queue.getMetrics().setMaxShare(getMaxResources(name)); queue.getMetrics().setMaxShare(queue.getMaxShare());
queue.getMetrics().setMaxApps(getQueueMaxApps(name)); queue.getMetrics().setMaxApps(queue.getMaxRunningApps());
queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName()); queue.getMetrics().setSchedulingPolicy(getSchedulingPolicy(name).getName());
} }
} }

View File

@ -227,8 +227,8 @@ public class AllocationFileLoaderService extends AbstractService {
// Create some temporary hashmaps to hold the new allocs, and we only save // Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully. // them in our fields if we have parsed the entire allocs file successfully.
Map<String, Resource> minQueueResources = new HashMap<>(); Map<String, Resource> minQueueResources = new HashMap<>();
Map<String, Resource> maxQueueResources = new HashMap<>(); Map<String, ConfigurableResource> maxQueueResources = new HashMap<>();
Map<String, Resource> maxChildQueueResources = new HashMap<>(); Map<String, ConfigurableResource> maxChildQueueResources = new HashMap<>();
Map<String, Integer> queueMaxApps = new HashMap<>(); Map<String, Integer> queueMaxApps = new HashMap<>();
Map<String, Integer> userMaxApps = new HashMap<>(); Map<String, Integer> userMaxApps = new HashMap<>();
Map<String, Float> queueMaxAMShares = new HashMap<>(); Map<String, Float> queueMaxAMShares = new HashMap<>();
@ -245,7 +245,8 @@ public class AllocationFileLoaderService extends AbstractService {
Set<String> nonPreemptableQueues = new HashSet<>(); Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE; int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE;
Resource queueMaxResourcesDefault = Resources.unbounded(); ConfigurableResource queueMaxResourcesDefault =
new ConfigurableResource(Resources.unbounded());
float queueMaxAMShareDefault = 0.5f; float queueMaxAMShareDefault = 0.5f;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
@ -306,7 +307,7 @@ public class AllocationFileLoaderService extends AbstractService {
} }
} else if ("queueMaxResourcesDefault".equals(element.getTagName())) { } else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim(); String text = ((Text)element.getFirstChild()).getData().trim();
Resource val = ConfigurableResource val =
FairSchedulerConfiguration.parseResourceConfigValue(text); FairSchedulerConfiguration.parseResourceConfigValue(text);
queueMaxResourcesDefault = val; queueMaxResourcesDefault = val;
} else if ("userMaxAppsDefault".equals(element.getTagName())) { } else if ("userMaxAppsDefault".equals(element.getTagName())) {
@ -449,8 +450,8 @@ public class AllocationFileLoaderService extends AbstractService {
*/ */
private void loadQueue(String parentName, Element element, private void loadQueue(String parentName, Element element,
Map<String, Resource> minQueueResources, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, ConfigurableResource> maxQueueResources,
Map<String, Resource> maxChildQueueResources, Map<String, ConfigurableResource> maxChildQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Integer> userMaxApps,
Map<String, Float> queueMaxAMShares, Map<String, Float> queueMaxAMShares,
@ -497,17 +498,17 @@ public class AllocationFileLoaderService extends AbstractService {
Element field = (Element) fieldNode; Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) { if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim(); String text = ((Text)field.getFirstChild()).getData().trim();
Resource val = ConfigurableResource val =
FairSchedulerConfiguration.parseResourceConfigValue(text); FairSchedulerConfiguration.parseResourceConfigValue(text);
minQueueResources.put(queueName, val); minQueueResources.put(queueName, val.getResource());
} else if ("maxResources".equals(field.getTagName())) { } else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim(); String text = ((Text)field.getFirstChild()).getData().trim();
Resource val = ConfigurableResource val =
FairSchedulerConfiguration.parseResourceConfigValue(text); FairSchedulerConfiguration.parseResourceConfigValue(text);
maxQueueResources.put(queueName, val); maxQueueResources.put(queueName, val);
} else if ("maxChildResources".equals(field.getTagName())) { } else if ("maxChildResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim(); String text = ((Text)field.getFirstChild()).getData().trim();
Resource val = ConfigurableResource val =
FairSchedulerConfiguration.parseResourceConfigValue(text); FairSchedulerConfiguration.parseResourceConfigValue(text);
maxChildQueueResources.put(queueName, val); maxChildQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) { } else if ("maxRunningApps".equals(field.getTagName())) {
@ -607,14 +608,24 @@ public class AllocationFileLoaderService extends AbstractService {
queueAcls.put(queueName, acls); queueAcls.put(queueName, acls);
resAcls.put(queueName, racls); resAcls.put(queueName, racls);
if (maxQueueResources.containsKey(queueName) && checkMinAndMaxResource(minQueueResources, maxQueueResources, queueName);
minQueueResources.containsKey(queueName) }
&& !Resources.fitsIn(minQueueResources.get(queueName),
maxQueueResources.get(queueName))) { private void checkMinAndMaxResource(Map<String, Resource> minResources,
LOG.warn( Map<String, ConfigurableResource> maxResources, String queueName) {
String.format("Queue %s has max resources %s less than "
+ "min resources %s", queueName, maxQueueResources.get(queueName), ConfigurableResource maxConfigurableResource = maxResources.get(queueName);
minQueueResources.get(queueName))); Resource minResource = minResources.get(queueName);
if (maxConfigurableResource != null && minResource != null) {
Resource maxResource = maxConfigurableResource.getResource();
// check whether max resource is bigger or equals to min resource when max
// resource are absolute values
if (maxResource != null && !Resources.fitsIn(minResource, maxResource)) {
LOG.warn(String.format("Queue %s has max resources %s less than "
+ "min resources %s", queueName, maxResource, minResource));
}
} }
} }

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* A {@code ConfigurableResource} object represents an entity that is used to
* configure resources, such as maximum resources of a queue. It can be
* percentage of cluster resources or an absolute value.
*/
@Private
@Unstable
public class ConfigurableResource {
private final Resource resource;
private final double[] percentages;
public ConfigurableResource(double[] percentages) {
this.percentages = percentages.clone();
this.resource = null;
}
public ConfigurableResource(Resource resource) {
this.percentages = null;
this.resource = resource;
}
/**
* Get resource by multiplying the cluster resource and the percentage of
* each resource respectively. Return the absolute resource if either
* {@code percentages} or {@code clusterResource) is null.
*
* @param clusterResource the cluster resource
* @return resource
*/
public Resource getResource(Resource clusterResource) {
if (percentages != null && clusterResource != null) {
long memory = (long) (clusterResource.getMemorySize() * percentages[0]);
int vcore = (int) (clusterResource.getVirtualCores() * percentages[1]);
return Resource.newInstance(memory, vcore);
} else {
return resource;
}
}
/**
* Get the absolute resource.
*
* @return the absolute resource
*/
public Resource getResource() {
return resource;
}
}

View File

@ -343,10 +343,10 @@ public class FSLeafQueue extends FSQueue {
readLock.unlock(); readLock.unlock();
} }
// Cap demand to maxShare to limit allocation to maxShare // Cap demand to maxShare to limit allocation to maxShare
demand = Resources.componentwiseMin(tmpDemand, maxShare); demand = Resources.componentwiseMin(tmpDemand, getMaxShare());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxShare); + "; the max is " + getMaxShare());
LOG.debug("The updated fairshare for " + getName() + " is " LOG.debug("The updated fairshare for " + getName() + " is "
+ getFairShare()); + getFairShare());
} }
@ -622,7 +622,7 @@ public class FSLeafQueue extends FSQueue {
", Policy: " + policy.getName() + ", Policy: " + policy.getName() +
", FairShare: " + getFairShare() + ", FairShare: " + getFairShare() +
", SteadyFairShare: " + getSteadyFairShare() + ", SteadyFairShare: " + getSteadyFairShare() +
", MaxShare: " + maxShare + ", MaxShare: " + getMaxShare() +
", MinShare: " + minShare + ", MinShare: " + minShare +
", ResourceUsage: " + getResourceUsage() + ", ResourceUsage: " + getResourceUsage() +
", Demand: " + getDemand() + ", Demand: " + getDemand() +

View File

@ -150,13 +150,13 @@ public class FSParentQueue extends FSQueue {
} }
} }
// Cap demand to maxShare to limit allocation to maxShare // Cap demand to maxShare to limit allocation to maxShare
demand = Resources.componentwiseMin(demand, maxShare); demand = Resources.componentwiseMin(demand, getMaxShare());
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand + LOG.debug("The updated demand for " + getName() + " is " + demand +
"; the max is " + maxShare); "; the max is " + getMaxShare());
} }
} }
@ -300,7 +300,7 @@ public class FSParentQueue extends FSQueue {
", Policy: " + policy.getName() + ", Policy: " + policy.getName() +
", FairShare: " + getFairShare() + ", FairShare: " + getFairShare() +
", SteadyFairShare: " + getSteadyFairShare() + ", SteadyFairShare: " + getSteadyFairShare() +
", MaxShare: " + maxShare + ", MaxShare: " + getMaxShare() +
", MinShare: " + minShare + ", MinShare: " + minShare +
", ResourceUsage: " + getResourceUsage() + ", ResourceUsage: " + getResourceUsage() +
", Demand: " + getDemand() + ", Demand: " + getDemand() +

View File

@ -72,9 +72,9 @@ public abstract class FSQueue implements Queue, Schedulable {
protected ResourceWeights weights; protected ResourceWeights weights;
protected Resource minShare; protected Resource minShare;
protected Resource maxShare; private ConfigurableResource maxShare;
protected int maxRunningApps; protected int maxRunningApps;
protected Resource maxChildQueueResource; private ConfigurableResource maxChildQueueResource;
// maxAMShare is a value between 0 and 1. // maxAMShare is a value between 0 and 1.
protected float maxAMShare; protected float maxAMShare;
@ -106,7 +106,7 @@ public abstract class FSQueue implements Queue, Schedulable {
* *
* @param recursive whether child queues should be reinitialized recursively * @param recursive whether child queues should be reinitialized recursively
*/ */
public void reinit(boolean recursive) { public final void reinit(boolean recursive) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
allocConf.initFSQueue(this); allocConf.initFSQueue(this);
updatePreemptionVariables(); updatePreemptionVariables();
@ -158,26 +158,35 @@ public abstract class FSQueue implements Queue, Schedulable {
return minShare; return minShare;
} }
public void setMaxShare(Resource maxShare){ public void setMaxShare(ConfigurableResource maxShare){
this.maxShare = maxShare; this.maxShare = maxShare;
} }
@Override
public Resource getMaxShare() {
Resource maxResource = maxShare.getResource(scheduler.getClusterResource());
// Max resource should be greater than or equal to min resource
Resource result = Resources.componentwiseMax(maxResource, minShare);
if (!Resources.equals(maxResource, result)) {
LOG.warn(String.format("Queue %s has max resources %s less than "
+ "min resources %s", getName(), maxResource, minShare));
}
return result;
}
public Resource getReservedResource() { public Resource getReservedResource() {
reservedResource.setMemorySize(metrics.getReservedMB()); reservedResource.setMemorySize(metrics.getReservedMB());
reservedResource.setVirtualCores(metrics.getReservedVirtualCores()); reservedResource.setVirtualCores(metrics.getReservedVirtualCores());
return reservedResource; return reservedResource;
} }
@Override public void setMaxChildQueueResource(ConfigurableResource maxChildShare){
public Resource getMaxShare() {
return maxShare;
}
public void setMaxChildQueueResource(Resource maxChildShare){
this.maxChildQueueResource = maxChildShare; this.maxChildQueueResource = maxChildShare;
} }
public Resource getMaxChildQueueResource() { public ConfigurableResource getMaxChildQueueResource() {
return maxChildQueueResource; return maxChildQueueResource;
} }
@ -416,7 +425,7 @@ public abstract class FSQueue implements Queue, Schedulable {
+ " because it has reserved containers."); + " because it has reserved containers.");
} }
return false; return false;
} else if (!Resources.fitsIn(getResourceUsage(), maxShare)) { } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Assigning container failed on node '" + node.getNodeName() LOG.debug("Assigning container failed on node '" + node.getNodeName()
+ " because queue resource usage is larger than MaxShare: " + " because queue resource usage is larger than MaxShare: "

View File

@ -1518,7 +1518,8 @@ public class FairScheduler extends
if ((queue.getParent() != null) && if ((queue.getParent() != null) &&
!configuredLeafQueues.contains(queue.getName()) && !configuredLeafQueues.contains(queue.getName()) &&
!configuredParentQueues.contains(queue.getName())) { !configuredParentQueues.contains(queue.getName())) {
Resource max = queue.getParent().getMaxChildQueueResource(); ConfigurableResource max = queue.getParent().
getMaxChildQueueResource();
if (max != null) { if (max != null) {
queue.setMaxShare(max); queue.setMaxShare(max);

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -287,19 +288,60 @@ public class FairSchedulerConfiguration extends Configuration {
* *
* @throws AllocationConfigurationException * @throws AllocationConfigurationException
*/ */
public static Resource parseResourceConfigValue(String val) public static ConfigurableResource parseResourceConfigValue(String val)
throws AllocationConfigurationException { throws AllocationConfigurationException {
ConfigurableResource configurableResource;
try { try {
val = StringUtils.toLowerCase(val); val = StringUtils.toLowerCase(val);
int memory = findResource(val, "mb"); if (val.contains("%")) {
int vcores = findResource(val, "vcores"); configurableResource = new ConfigurableResource(
return BuilderUtils.newResource(memory, vcores); getResourcePercentage(val));
} else {
int memory = findResource(val, "mb");
int vcores = findResource(val, "vcores");
configurableResource = new ConfigurableResource(
BuilderUtils.newResource(memory, vcores));
}
} catch (AllocationConfigurationException ex) { } catch (AllocationConfigurationException ex) {
throw ex; throw ex;
} catch (Exception ex) { } catch (Exception ex) {
throw new AllocationConfigurationException( throw new AllocationConfigurationException(
"Error reading resource config", ex); "Error reading resource config", ex);
} }
return configurableResource;
}
private static double[] getResourcePercentage(
String val) throws AllocationConfigurationException {
double[] resourcePercentage = new double[ResourceType.values().length];
String[] strings = val.split(",");
if (strings.length == 1) {
double percentage = findPercentage(strings[0], "");
for (int i = 0; i < ResourceType.values().length; i++) {
resourcePercentage[i] = percentage/100;
}
} else {
resourcePercentage[0] = findPercentage(val, "memory")/100;
resourcePercentage[1] = findPercentage(val, "cpu")/100;
}
return resourcePercentage;
}
private static double findPercentage(String val, String units)
throws AllocationConfigurationException {
final Pattern pattern =
Pattern.compile("((\\d+)(\\.\\d*)?)\\s*%\\s*" + units);
Matcher matcher = pattern.matcher(val);
if (!matcher.find()) {
if (units.equals("")) {
throw new AllocationConfigurationException("Invalid percentage: " +
val);
} else {
throw new AllocationConfigurationException("Missing resource: " +
units);
}
}
return Double.parseDouble(matcher.group(1));
} }
public long getUpdateInterval() { public long getUpdateInterval() {
@ -307,8 +349,8 @@ public class FairSchedulerConfiguration extends Configuration {
} }
private static int findResource(String val, String units) private static int findResource(String val, String units)
throws AllocationConfigurationException { throws AllocationConfigurationException {
Pattern pattern = Pattern.compile("(\\d+)(\\.\\d*)?\\s*" + units); final Pattern pattern = Pattern.compile("(\\d+)(\\.\\d*)?\\s*" + units);
Matcher matcher = pattern.matcher(val); Matcher matcher = pattern.matcher(val);
if (!matcher.find()) { if (!matcher.find()) {
throw new AllocationConfigurationException("Missing resource: " + units); throw new AllocationConfigurationException("Missing resource: " + units);

View File

@ -41,7 +41,6 @@ import org.xml.sax.SAXException;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
/** /**
* Maintains a list of queues as well as scheduling parameters for each queue, * Maintains a list of queues as well as scheduling parameters for each queue,
@ -330,7 +329,7 @@ public class QueueManager {
!configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) { !configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) {
// For ad hoc queues, set their max reource allocations based on // For ad hoc queues, set their max reource allocations based on
// their parents' default child settings. // their parents' default child settings.
Resource maxChild = parent.getMaxChildQueueResource(); ConfigurableResource maxChild = parent.getMaxChildQueueResource();
if (maxChild != null) { if (maxChild != null) {
child.setMaxShare(maxChild); child.setMaxShare(maxChild);

View File

@ -235,21 +235,21 @@ public class TestAllocationFileLoaderService {
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(2048, 10), assertEquals(Resources.createResource(2048, 10),
queueConf.getMaxResources("root.queueA")); queueConf.getMaxResources("root.queueA").getResource());
assertEquals(Resources.createResource(5120, 110), assertEquals(Resources.createResource(5120, 110),
queueConf.getMaxResources("root.queueB")); queueConf.getMaxResources("root.queueB").getResource());
assertEquals(Resources.createResource(5120, 0),
queueConf.getMaxResources("root.queueC"));
assertEquals(Resources.createResource(4096, 100), assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueD")); queueConf.getMaxResources("root.queueC").getResource());
assertEquals(Resources.createResource(4096, 100), assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueE")); queueConf.getMaxResources("root.queueD").getResource());
assertEquals(Resources.createResource(4096, 100), assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueF")); queueConf.getMaxResources("root.queueE").getResource());
assertEquals(Resources.createResource(4096, 100), assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG")); queueConf.getMaxResources("root.queueF").getResource());
assertEquals(Resources.createResource(4096, 100), assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG.queueH")); queueConf.getMaxResources("root.queueG").getResource());
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG.queueH").getResource());
assertEquals(Resources.createResource(1024, 0), assertEquals(Resources.createResource(1024, 0),
queueConf.getMinResources("root.queueA")); queueConf.getMinResources("root.queueA"));
@ -279,9 +279,9 @@ public class TestAllocationFileLoaderService {
assertNull("Max child resources unexpectedly set for queue root.queueE", assertNull("Max child resources unexpectedly set for queue root.queueE",
queueConf.getMaxChildResources("root.queueE")); queueConf.getMaxChildResources("root.queueE"));
assertEquals(Resources.createResource(2048, 64), assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueF")); queueConf.getMaxChildResources("root.queueF").getResource());
assertEquals(Resources.createResource(2048, 64), assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueG")); queueConf.getMaxChildResources("root.queueG").getResource());
assertNull("Max child resources unexpectedly set for " assertNull("Max child resources unexpectedly set for "
+ "queue root.queueG.queueH", + "queue root.queueG.queueH",
queueConf.getMaxChildResources("root.queueG.queueH")); queueConf.getMaxChildResources("root.queueG.queueH"));

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* To test class {@link ConfigurableResource}.
*/
public class TestConfigurableResource {
private final Resource clusterResource = Resources.createResource(2048, 2);
@Test
public void testGetResourceWithPercentage() {
ConfigurableResource configurableResource =
new ConfigurableResource(new double[] {0.5, 0.5});
assertEquals(
configurableResource.getResource(clusterResource).getMemorySize(),
1024);
assertEquals(
configurableResource.getResource(clusterResource).getVirtualCores(), 1);
assertNull("The absolute resource should be null since object"
+ " configurableResource is initialized with percentages",
configurableResource.getResource());
assertNull("The absolute resource should be null since cluster resource"
+ " is null", configurableResource.getResource(null));
}
@Test
public void testGetResourceWithAbsolute() {
ConfigurableResource configurableResource =
new ConfigurableResource(Resources.createResource(3072, 3));
assertEquals(configurableResource.getResource().getMemorySize(), 3072);
assertEquals(configurableResource.getResource().getVirtualCores(), 3);
assertEquals(
configurableResource.getResource(clusterResource).getMemorySize(),
3072);
assertEquals(
configurableResource.getResource(clusterResource).getVirtualCores(),
3);
assertEquals(configurableResource.getResource(null).getMemorySize(), 3072);
assertEquals(configurableResource.getResource(null).getVirtualCores(), 3);
}
}

View File

@ -81,7 +81,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
String queueName = "root.queue1"; String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.setMaxShare(maxResource); schedulable.setMaxShare(new ConfigurableResource(maxResource));
assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE); assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE);
assertEquals(schedulable.getMetrics().getSchedulingPolicy(), assertEquals(schedulable.getMetrics().getSchedulingPolicy(),
SchedulingPolicy.DEFAULT_POLICY.getName()); SchedulingPolicy.DEFAULT_POLICY.getName());

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -42,6 +43,8 @@ public class TestFSParentQueue {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>(); notEmptyQueues = new HashSet<FSQueue>();

View File

@ -4608,7 +4608,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
QueueManager queueMgr = scheduler.getQueueManager(); QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSQueue queue2 = queueMgr.getLeafQueue("queue2", true); FSQueue queue2 = queueMgr.getLeafQueue("queue2", true);
queue2.setMaxShare(Resource.newInstance(1024, 1)); queue2.setMaxShare(
new ConfigurableResource(Resource.newInstance(1024, 1)));
ApplicationAttemptId appAttId = ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3); createSchedulingRequest(1024, 1, "queue1", "user1", 3);
@ -5206,6 +5207,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
Resource maxResource = Resources.createResource(1024 * 8); Resource maxResource = Resources.createResource(1024 * 8);
ConfigurableResource maxResourceConf =
new ConfigurableResource(maxResource);
FSAppAttempt app1 = mock(FSAppAttempt.class); FSAppAttempt app1 = mock(FSAppAttempt.class);
Mockito.when(app1.getDemand()).thenReturn(maxResource); Mockito.when(app1.getDemand()).thenReturn(maxResource);
@ -5217,15 +5220,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSLeafQueue aQueue = FSLeafQueue aQueue =
new FSLeafQueue("root.queue1.a", scheduler, queue1); new FSLeafQueue("root.queue1.a", scheduler, queue1);
aQueue.setMaxShare(maxResource); aQueue.setMaxShare(maxResourceConf);
aQueue.addAppSchedulable(app1); aQueue.addAppSchedulable(app1);
FSLeafQueue bQueue = FSLeafQueue bQueue =
new FSLeafQueue("root.queue1.b", scheduler, queue1); new FSLeafQueue("root.queue1.b", scheduler, queue1);
bQueue.setMaxShare(maxResource); bQueue.setMaxShare(maxResourceConf);
bQueue.addAppSchedulable(app2); bQueue.addAppSchedulable(app2);
queue1.setMaxShare(maxResource); queue1.setMaxShare(maxResourceConf);
queue1.addChildQueue(aQueue); queue1.addChildQueue(aQueue);
queue1.addChildQueue(bQueue); queue1.addChildQueue(bQueue);
@ -5263,7 +5266,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSLeafQueue child1 = FSLeafQueue child1 =
scheduler.getQueueManager().getLeafQueue("parent.child1", false); scheduler.getQueueManager().getLeafQueue("parent.child1", false);
Resource resource = Resource.newInstance(4 * GB, 4); Resource resource = Resource.newInstance(4 * GB, 4);
child1.setMaxShare(resource); child1.setMaxShare(new ConfigurableResource(resource));
FSAppAttempt app = mock(FSAppAttempt.class); FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(resource); Mockito.when(app.getDemand()).thenReturn(resource);
Mockito.when(app.getResourceUsage()).thenReturn(resource); Mockito.when(app.getResourceUsage()).thenReturn(resource);
@ -5291,7 +5294,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertTrue(child1.dumpState().equals(childQueueString)); assertTrue(child1.dumpState().equals(childQueueString));
FSParentQueue parent = FSParentQueue parent =
scheduler.getQueueManager().getParentQueue("parent", false); scheduler.getQueueManager().getParentQueue("parent", false);
parent.setMaxShare(resource); parent.setMaxShare(new ConfigurableResource(resource));
parent.updateDemand(); parent.updateDemand();
String parentQueueString = "{Name: root.parent," String parentQueueString = "{Name: root.parent,"

View File

@ -20,11 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.File; import org.apache.hadoop.yarn.api.records.Resource;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Test; import org.junit.Test;
@ -32,23 +28,59 @@ public class TestFairSchedulerConfiguration {
@Test @Test
public void testParseResourceConfigValue() throws Exception { public void testParseResourceConfigValue() throws Exception {
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("2 vcores, 1024 mb")); parseResourceConfigValue("2 vcores, 1024 mb").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024 mb, 2 vcores")); parseResourceConfigValue("1024 mb, 2 vcores").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("2vcores,1024mb")); parseResourceConfigValue("2vcores,1024mb").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024mb,2vcores")); parseResourceConfigValue("1024mb,2vcores").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024 mb, 2 vcores")); parseResourceConfigValue("1024 mb, 2 vcores").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("1024 Mb, 2 vCores")); parseResourceConfigValue("1024 Mb, 2 vCores").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue(" 1024 mb, 2 vcores ")); parseResourceConfigValue(" 1024 mb, 2 vcores ").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue(" 1024.3 mb, 2.35 vcores ")); parseResourceConfigValue(" 1024.3 mb, 2.35 vcores ").getResource());
assertEquals(BuilderUtils.newResource(1024, 2), assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue(" 1024. mb, 2. vcores ")); parseResourceConfigValue(" 1024. mb, 2. vcores ").getResource());
Resource clusterResource = BuilderUtils.newResource(2048, 4);
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50% memory, 50% cpu").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50% Memory, 50% CpU").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50%").getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 4),
parseResourceConfigValue("50% memory, 100% cpu").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 4),
parseResourceConfigValue(" 100% cpu, 50% memory").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 0),
parseResourceConfigValue("50% memory, 0% cpu").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50 % memory, 50 % cpu").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50%memory,50%cpu").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue(" 50 % memory, 50 % cpu ").
getResource(clusterResource));
assertEquals(BuilderUtils.newResource(1024, 2),
parseResourceConfigValue("50.% memory, 50.% cpu").
getResource(clusterResource));
clusterResource = BuilderUtils.newResource(1024 * 10, 4);
assertEquals(BuilderUtils.newResource((int)(1024 * 10 * 0.109), 2),
parseResourceConfigValue("10.9% memory, 50.6% cpu").
getResource(clusterResource));
} }
@Test(expected = AllocationConfigurationException.class) @Test(expected = AllocationConfigurationException.class)
@ -70,5 +102,24 @@ public class TestFairSchedulerConfiguration {
public void testGibberish() throws Exception { public void testGibberish() throws Exception {
parseResourceConfigValue("1o24vc0res"); parseResourceConfigValue("1o24vc0res");
} }
@Test(expected = AllocationConfigurationException.class)
public void testNoUnitsPercentage() throws Exception {
parseResourceConfigValue("95%, 50% memory");
}
@Test(expected = AllocationConfigurationException.class)
public void testInvalidNumPercentage() throws Exception {
parseResourceConfigValue("95A% cpu, 50% memory");
}
@Test(expected = AllocationConfigurationException.class)
public void testCpuPercentageMemoryAbsolute() throws Exception {
parseResourceConfigValue("50% cpu, 1024 mb");
}
@Test(expected = AllocationConfigurationException.class)
public void testMemoryPercentageCpuAbsolute() throws Exception {
parseResourceConfigValue("50% memory, 2 vcores");
}
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -55,7 +56,9 @@ public class TestMaxRunningAppsEnforcer {
AllocationConfiguration allocConf = new AllocationConfiguration( AllocationConfiguration allocConf = new AllocationConfiguration(
conf); conf);
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
queueManager = new QueueManager(scheduler); queueManager = new QueueManager(scheduler);
queueManager.initialize(conf); queueManager.initialize(conf);
userMaxApps = allocConf.userMaxApps; userMaxApps = allocConf.userMaxApps;

View File

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -50,6 +51,8 @@ public class TestQueueManager {
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance(); SystemClock clock = SystemClock.getInstance();
@ -206,7 +209,7 @@ public class TestQueueManager {
queueManager.updateAllocationConfiguration(allocConf); queueManager.updateAllocationConfiguration(allocConf);
queueManager.getQueue("root.test").setMaxChildQueueResource( queueManager.getQueue("root.test").setMaxChildQueueResource(
Resources.createResource(8192, 256)); new ConfigurableResource(Resources.createResource(8192, 256)));
FSQueue q1 = queueManager.createQueue("root.test.childC", FSQueueType.LEAF); FSQueue q1 = queueManager.createQueue("root.test.childC", FSQueueType.LEAF);
assertNotNull("Leaf queue root.test.childC was not created", assertNotNull("Leaf queue root.test.childC was not created",

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedule
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -43,6 +44,8 @@ public class TestFairSchedulerQueueInfo {
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf); when(scheduler.getConf()).thenReturn(conf);
when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1)); when(scheduler.getClusterResource()).thenReturn(Resource.newInstance(1, 1));
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
SystemClock clock = SystemClock.getInstance(); SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock); when(scheduler.getClock()).thenReturn(clock);
QueueManager queueManager = new QueueManager(scheduler); QueueManager queueManager = new QueueManager(scheduler);

View File

@ -85,9 +85,9 @@ The allocation file must be in XML format. The format contains five types of ele
* **minResources**: minimum resources the queue is entitled to, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. If a queue's minimum share is not satisfied, it will be offered available resources before any other queue under the same parent. Under the single-resource fairness policy, a queue is considered unsatisfied if its memory usage is below its minimum memory share. Under dominant resource fairness, a queue is considered unsatisfied if its usage for its dominant resource with respect to the cluster capacity is below its minimum share for that resource. If multiple queues are unsatisfied in this situation, resources go to the queue with the smallest ratio between relevant resource usage and minimum. Note that it is possible that a queue that is below its minimum may not immediately get up to its minimum when it submits an application, because already-running jobs may be using those resources. * **minResources**: minimum resources the queue is entitled to, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores value is ignored. If a queue's minimum share is not satisfied, it will be offered available resources before any other queue under the same parent. Under the single-resource fairness policy, a queue is considered unsatisfied if its memory usage is below its minimum memory share. Under dominant resource fairness, a queue is considered unsatisfied if its usage for its dominant resource with respect to the cluster capacity is below its minimum share for that resource. If multiple queues are unsatisfied in this situation, resources go to the queue with the smallest ratio between relevant resource usage and minimum. Note that it is possible that a queue that is below its minimum may not immediately get up to its minimum when it submits an application, because already-running jobs may be using those resources.
* **maxResources**: maximum resources a queue is allowed, in the form "X mb, Y vcores". A queue will never be assigned a container that would put its aggregate usage over this limit. * **maxResources**: maximum resources a queue is allocated, expressed either in absolute values (X mb, Y vcores) or as a percentage of the cluster resources (X% memory, Y% cpu). A queue will not be assigned a container that would put its aggregate usage over this limit.
* **maxChildResources**: maximum resources an ad hoc child queue is allowed, in the form "X mb, Y vcores". Any ad hoc queue that is a direct child of a queue with this property set will have it's maxResources property set accordingly. * **maxChildResources**: maximum resources an ad hoc child queue is allocated, expressed either in absolute values (X mb, Y vcores) or as a percentage of the cluster resources (X% memory, Y% cpu). An ad hoc child queue will not be assigned a container that would put its aggregate usage over this limit.
* **maxRunningApps**: limit the number of apps from the queue to run at once * **maxRunningApps**: limit the number of apps from the queue to run at once