YARN-4702. FairScheduler: Allow setting maxResources for ad hoc queues. (Daniel Templeton via kasha)

(cherry picked from commit 20f0eb871c)
This commit is contained in:
Karthik Kambatla 2016-08-17 17:40:20 -07:00 committed by Karthik Kambatla
parent a20b943cf9
commit 07d5ab16df
8 changed files with 670 additions and 125 deletions

View File

@ -46,6 +46,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
// Maximum amount of resources per queue
@VisibleForTesting
final Map<String, Resource> maxQueueResources;
// Maximum amount of resources for each queue's ad hoc children
private final Map<String, Resource> maxChildQueueResources;
// Sharing weights for each queue
private final Map<String, ResourceWeights> queueWeights;
@ -107,6 +109,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources,
Map<String, Resource> maxChildQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights,
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
@ -126,6 +129,7 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
Set<String> nonPreemptableQueues) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.maxChildQueueResources = maxChildQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
this.queueMaxAMShares = queueMaxAMShares;
@ -149,31 +153,32 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
}
public AllocationConfiguration(Configuration conf) {
minQueueResources = new HashMap<String, Resource>();
maxQueueResources = new HashMap<String, Resource>();
queueWeights = new HashMap<String, ResourceWeights>();
queueMaxApps = new HashMap<String, Integer>();
userMaxApps = new HashMap<String, Integer>();
queueMaxAMShares = new HashMap<String, Float>();
minQueueResources = new HashMap<>();
maxChildQueueResources = new HashMap<>();
maxQueueResources = new HashMap<>();
queueWeights = new HashMap<>();
queueMaxApps = new HashMap<>();
userMaxApps = new HashMap<>();
queueMaxAMShares = new HashMap<>();
userMaxAppsDefault = Integer.MAX_VALUE;
queueMaxAppsDefault = Integer.MAX_VALUE;
queueMaxResourcesDefault = Resources.unbounded();
queueMaxAMShareDefault = 0.5f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
resAcls = new HashMap<String, Map<ReservationACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionThresholds = new HashMap<String, Float>();
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
queueAcls = new HashMap<>();
resAcls = new HashMap<>();
minSharePreemptionTimeouts = new HashMap<>();
fairSharePreemptionTimeouts = new HashMap<>();
fairSharePreemptionThresholds = new HashMap<>();
schedulingPolicies = new HashMap<>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
reservableQueues = new HashSet<>();
configuredQueues = new HashMap<FSQueueType, Set<String>>();
configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues);
nonPreemptableQueues = new HashSet<String>();
placementPolicy =
QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
nonPreemptableQueues = new HashSet<>();
}
/**
@ -263,7 +268,10 @@ public float getQueueMaxAMShare(String queue) {
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
*
* @param queue the target queue's name
* @return the min allocation on this queue or {@link Resources#none}
* if not set
*/
public Resource getMinResources(String queue) {
Resource minQueueResource = minQueueResources.get(queue);
@ -271,14 +279,26 @@ public Resource getMinResources(String queue) {
}
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
* Set the maximum resource allocation for the given queue.
*
* @param queue the target queue
* @param maxResource the maximum resource allocation
*/
void setMaxResources(String queue, Resource maxResource) {
maxQueueResources.put(queue, maxResource);
}
public Resource getMaxResources(String queueName) {
Resource maxQueueResource = maxQueueResources.get(queueName);
/**
* Get the maximum resource allocation for the given queue. If the max in not
* set, return the larger of the min and the default max.
*
* @param queue the target queue's name
* @return the max allocation on this queue
*/
public Resource getMaxResources(String queue) {
Resource maxQueueResource = maxQueueResources.get(queue);
if (maxQueueResource == null) {
Resource minQueueResource = minQueueResources.get(queueName);
Resource minQueueResource = minQueueResources.get(queue);
if (minQueueResource != null &&
Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
minQueueResource, queueMaxResourcesDefault)) {
@ -291,6 +311,27 @@ public Resource getMaxResources(String queueName) {
}
}
/**
* Get the maximum resource allocation for children of the given queue.
*
* @param queue the target queue's name
* @return the max allocation on this queue or null if not set
*/
public Resource getMaxChildResources(String queue) {
return maxChildQueueResources.get(queue);
}
/**
* Set the maximum resource allocation for the children of the given queue.
* Use of this method is primarily intended for testing purposes.
*
* @param queue the target queue
* @param maxResource the maximum resource allocation
*/
void setMaxChildResources(String queue, Resource maxResource) {
maxChildQueueResources.put(queue, maxResource);
}
public boolean hasAccess(String queueName, QueueACL acl,
UserGroupInformation user) {
int lastPeriodIndex = queueName.length();

View File

@ -213,23 +213,22 @@ public synchronized void reloadAllocations() throws IOException,
LOG.info("Loading allocation file " + allocFile);
// Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully.
Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Float> fairSharePreemptionThresholds =
new HashMap<String, Float>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
Map<String, Resource> minQueueResources = new HashMap<>();
Map<String, Resource> maxQueueResources = new HashMap<>();
Map<String, Resource> maxChildQueueResources = new HashMap<>();
Map<String, Integer> queueMaxApps = new HashMap<>();
Map<String, Integer> userMaxApps = new HashMap<>();
Map<String, Float> queueMaxAMShares = new HashMap<>();
Map<String, ResourceWeights> queueWeights = new HashMap<>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
new HashMap<String, Map<ReservationACL, AccessControlList>>();
Set<String> reservableQueues = new HashSet<String>();
Set<String> nonPreemptableQueues = new HashSet<String>();
new HashMap<>();
Set<String> reservableQueues = new HashSet<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
Resource queueMaxResourcesDefault = Resources.unbounded();
@ -250,8 +249,8 @@ public synchronized void reloadAllocations() throws IOException,
// configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues =
new HashMap<FSQueueType, Set<String>>();
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
@ -368,10 +367,11 @@ public synchronized void reloadAllocations() throws IOException,
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, reservationAcls,
configuredQueues, reservableQueues, nonPreemptableQueues);
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
reservationAcls, configuredQueues, reservableQueues,
nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
@ -413,14 +413,15 @@ public synchronized void reloadAllocations() throws IOException,
globalReservationQueueConfig.setReservationAgent(reservationAgent);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
reservationAcls, newPlacementPolicy, configuredQueues,
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
AllocationConfiguration info =
new AllocationConfiguration(minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
reservationAcls, newPlacementPolicy, configuredQueues,
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@ -433,8 +434,11 @@ public synchronized void reloadAllocations() throws IOException,
*/
private void loadQueue(String parentName, Element element,
Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
Map<String, Resource> maxQueueResources,
Map<String, Resource> maxChildQueueResources,
Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps,
Map<String, Float> queueMaxAMShares,
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
@ -463,8 +467,8 @@ private void loadQueue(String parentName, Element element,
if (parentName != null) {
queueName = parentName + "." + queueName;
}
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
Map<QueueACL, AccessControlList> acls = new HashMap<>();
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
@ -476,12 +480,19 @@ private void loadQueue(String parentName, Element element,
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
minQueueResources.put(queueName, val);
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
maxQueueResources.put(queueName, val);
} else if ("maxChildResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
maxChildQueueResources.put(queueName, val);
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@ -543,8 +554,8 @@ private void loadQueue(String parentName, Element element,
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, resAcls, configuredQueues, reservableQueues,
nonPreemptableQueues);
@ -574,9 +585,8 @@ private void loadQueue(String parentName, Element element,
&& !Resources.fitsIn(minQueueResources.get(queueName),
maxQueueResources.get(queueName))) {
LOG.warn(
String.format(
"Queue %s has max resources %s less than min resources %s",
queueName, maxQueueResources.get(queueName),
String.format("Queue %s has max resources %s less than "
+ "min resources %s", queueName, maxQueueResources.get(queueName),
minQueueResources.get(queueName)));
}
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
@ -1573,11 +1574,41 @@ public void onReload(AllocationConfiguration queueInfo) {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
queueMgr.updateAllocationConfiguration(allocConf);
applyChildDefaults();
maxRunningEnforcer.updateRunnabilityOnReload();
}
}
}
/**
* After reloading the allocation config, the max resource settings for any
* ad hoc queues will be missing. This method goes through the queue manager's
* queue list and adds back the max resources settings for any ad hoc queues.
* Note that the new max resource settings will be based on the new config.
* The old settings are lost.
*/
private void applyChildDefaults() {
Collection<FSQueue> queues = queueMgr.getQueues();
Set<String> configuredLeafQueues =
allocConf.getConfiguredQueues().get(FSQueueType.LEAF);
Set<String> configuredParentQueues =
allocConf.getConfiguredQueues().get(FSQueueType.PARENT);
for (FSQueue queue : queues) {
// If the queue is ad hoc and not root, apply the child defaults
if ((queue.getParent() != null) &&
!configuredLeafQueues.contains(queue.getName()) &&
!configuredParentQueues.contains(queue.getName())) {
Resource max =
allocConf.getMaxChildResources(queue.getParent().getName());
if (max != null) {
allocConf.setMaxResources(queue.getName(), max);
}
}
}
}
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
FSQueue queue = queueMgr.getQueue(queueName);

View File

@ -39,6 +39,9 @@
import com.google.common.base.CharMatcher;
import com.google.common.annotations.VisibleForTesting;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
@ -173,15 +176,42 @@ private FSQueue getQueue(
}
/**
* Creates a leaf or parent queue based on what is specified in 'queueType'
* and places it in the tree. Creates any parents that don't already exist.
* Create a leaf or parent queue based on what is specified in
* {@code queueType} and place it in the tree. Create any parents that don't
* already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
* @return the created queue, if successful or null if not allowed (one of the
* parent queues in the queue name is already a leaf queue)
*/
private FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<String>();
@VisibleForTesting
FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<>();
FSParentQueue parent = buildNewQueueList(name, newQueueNames);
FSQueue queue = null;
if (parent != null) {
// Now that we know everything worked out, make all the queues
// and add them to the map.
queue = createNewQueues(queueType, parent, newQueueNames);
}
return queue;
}
/**
* Compile a list of all parent queues of the given queue name that do not
* already exist. The queue names will be added to the {@code newQueueNames}
* list. The list will be in order of increasing queue depth. The first
* element of the list will be the parent closest to the root. The last
* element added will be the queue to be created. This method returns the
* deepest parent that does exist.
*
* @param name the fully qualified name of the queue to create
* @param newQueueNames the list to which to add non-existent queues
* @return the deepest existing parent queue
*/
private FSParentQueue buildNewQueueList(String name,
List<String> newQueueNames) {
newQueueNames.add(name);
int sepIndex = name.length();
FSParentQueue parent = null;
@ -195,62 +225,120 @@ private FSQueue createQueue(String name, FSQueueType queueType) {
throw new InvalidQueueNameException("Illegal node name at offset " +
(sepIndex+1) + " for queue name " + name);
}
FSQueue queue;
String curName = null;
curName = name.substring(0, sepIndex);
queue = queues.get(curName);
String curName = name.substring(0, sepIndex);
FSQueue queue = queues.get(curName);
if (queue == null) {
newQueueNames.add(curName);
newQueueNames.add(0, curName);
} else {
if (queue instanceof FSParentQueue) {
parent = (FSParentQueue)queue;
break;
} else {
return null;
}
}
}
// At this point, parent refers to the deepest existing parent of the
// queue to create.
// Now that we know everything worked out, make all the queues
// and add them to the map.
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
if (i == 0 && queueType != FSQueueType.PARENT) {
leafQueue = new FSLeafQueue(name, scheduler, parent);
try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
}
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
leafQueue.updatePreemptionVariables();
return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
try {
newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
}
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
newParent.updatePreemptionVariables();
parent = newParent;
// If the queue isn't a parent queue, parent will still be null when
// we break
break;
}
}
return parent;
}
/**
* Create all queues in the {@code newQueueNames} list. The list must be in
* order of increasing depth. All but the last element in the list will be
* created as parent queues. The last element will be created as the type
* specified by the {@code queueType} parameter. The first queue will be
* created as a child of the {@code topParent} queue. All subsequent queues
* will be created as a child of the previously created queue.
*
* @param queueType the type of the last queue to create
* @param topParent the parent of the first queue to create
* @param newQueueNames the list of queues to create
* @return the last queue created
*/
private FSQueue createNewQueues(FSQueueType queueType,
FSParentQueue topParent, List<String> newQueueNames) {
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
Iterator<String> i = newQueueNames.iterator();
FSParentQueue parent = topParent;
FSQueue queue = null;
while (i.hasNext()) {
FSParentQueue newParent = null;
String queueName = i.next();
// Only create a leaf queue at the very end
if (!i.hasNext() && (queueType != FSQueueType.PARENT)) {
FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent);
try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy()
+ " on new leaf queue.", ex);
}
leafQueues.add(leafQueue);
queue = leafQueue;
} else {
newParent = new FSParentQueue(queueName, scheduler, parent);
try {
newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
} catch (AllocationConfigurationException ex) {
LOG.warn("Failed to set default scheduling policy "
+ queueConf.getDefaultSchedulingPolicy()
+ " on new parent queue.", ex);
}
queue = newParent;
}
parent.addChildQueue(queue);
setChildResourceLimits(parent, queue, queueConf);
queues.put(queue.getName(), queue);
queue.updatePreemptionVariables();
// If we just created a leaf node, the newParent is null, but that's OK
// because we only create a leaf node in the very last iteration.
parent = newParent;
}
return queue;
}
/**
* For the given child queue, set the max resources based on the
* parent queue's default child resource settings. This method assumes that
* the child queue is ad hoc and hence does not do any safety checks around
* overwriting existing max resource settings.
*
* @param parent the parent queue
* @param child the child queue
* @param queueConf the {@link AllocationConfiguration}
*/
void setChildResourceLimits(FSParentQueue parent, FSQueue child,
AllocationConfiguration queueConf) {
Map<FSQueueType, Set<String>> configuredQueues =
queueConf.getConfiguredQueues();
// Ad hoc queues do not exist in the configured queues map
if (!configuredQueues.get(FSQueueType.LEAF).contains(child.getName()) &&
!configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) {
// For ad hoc queues, set their max reource allocations based on
// their parents' default child settings.
Resource maxChild = queueConf.getMaxChildResources(parent.getName());
if (maxChild != null) {
queueConf.setMaxResources(child.getName(), maxChild);
}
}
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to

View File

@ -179,12 +179,16 @@ public void testAllocationFileParsing() throws Exception {
out.println("<queue name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</queue>");
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
// Make queue F a parent queue without configured leaf queues using the
// 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
out.println("</queue>");
// Create hierarchical queues G,H, with different min/fair share preemption
// timeouts and preemption thresholds
// timeouts and preemption thresholds. Also add a child default to make sure
// it doesn't impact queue H.
out.println("<queue name=\"queueG\">");
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
@ -240,6 +244,12 @@ public void testAllocationFileParsing() throws Exception {
queueConf.getMaxResources("root.queueD"));
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueE"));
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueF"));
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG"));
assertEquals(Resources.createResource(4096, 100),
queueConf.getMaxResources("root.queueG.queueH"));
assertEquals(Resources.createResource(1024, 0),
queueConf.getMinResources("root.queueA"));
@ -251,8 +261,33 @@ public void testAllocationFileParsing() throws Exception {
queueConf.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueE"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueF"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueG"));
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root.queueG.queueH"));
assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertNull("Max child resources unexpectedly set for queue root.queueA",
queueConf.getMaxChildResources("root.queueA"));
assertNull("Max child resources unexpectedly set for queue root.queueB",
queueConf.getMaxChildResources("root.queueB"));
assertNull("Max child resources unexpectedly set for queue root.queueC",
queueConf.getMaxChildResources("root.queueC"));
assertNull("Max child resources unexpectedly set for queue root.queueD",
queueConf.getMaxChildResources("root.queueD"));
assertNull("Max child resources unexpectedly set for queue root.queueE",
queueConf.getMaxChildResources("root.queueE"));
assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueF"));
assertEquals(Resources.createResource(2048, 64),
queueConf.getMaxChildResources("root.queueG"));
assertNull("Max child resources unexpectedly set for "
+ "queue root.queueG.queueH",
queueConf.getMaxChildResources("root.queueG.queueH"));
assertEquals(15, queueConf.getQueueMaxApps("root."
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));

View File

@ -139,8 +139,6 @@ public void setUp() throws IOException {
conf = createConfiguration();
resourceManager = new MockRM(conf);
// TODO: This test should really be using MockRM. For now starting stuff
// that is needed at a bare minimum.
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
resourceManager.getRMContext().getStateStore().start();
@ -335,8 +333,14 @@ public void testSimpleFairShareCalculation() throws IOException {
}
}
/**
* Test fair shares when max resources are set but are too high to impact
* the shares.
*
* @throws IOException if scheduler reinitialization fails
*/
@Test
public void testFairShareWithMaxResources() throws IOException {
public void testFairShareWithHighMaxResources() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
// set queueA and queueB maxResources,
// the sum of queueA and queueB maxResources is more than
@ -376,11 +380,184 @@ public void testFairShareWithMaxResources() throws IOException {
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
"queueA", false);
// queueA's weight is 0.25, so its fair share should be 2 * 1024.
assertEquals(2 * 1024, queue.getFairShare().getMemorySize());
assertEquals("Queue A did not get its expected fair share",
2 * 1024, queue.getFairShare().getMemorySize());
// queueB's weight is 0.75, so its fair share should be 6 * 1024.
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
assertEquals(6 * 1024, queue.getFairShare().getMemorySize());
assertEquals("Queue B did not get its expected fair share",
6 * 1024, queue.getFairShare().getMemorySize());
}
/**
* Test fair shares when max resources are set and are low enough to impact
* the shares.
*
* @throws IOException if scheduler reinitialization fails
*/
@Test
public void testFairShareWithLowMaxResources() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\">");
out.println(" <maxResources>1024 mb 1 vcores</maxResources>");
out.println(" <weight>0.75</weight>");
out.println(" </queue>");
out.println(" <queue name=\"queueB\">");
out.println(" <maxResources>3072 mb 3 vcores</maxResources>");
out.println(" <weight>0.25</weight>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId1 =
createSchedulingRequest(1024, 1, "queueA", "user1", 2);
ApplicationAttemptId attId2 =
createSchedulingRequest(1024, 1, "queueB", "user1", 4);
scheduler.update();
FSLeafQueue queue =
scheduler.getQueueManager().getLeafQueue("queueA", false);
// queueA's weight is 0.5, so its fair share should be 6GB, but it's
// capped at 1GB.
assertEquals("Queue A did not get its expected fair share",
1 * 1024, queue.getFairShare().getMemorySize());
// queueB's weight is 0.5, so its fair share should be 2GB, but the
// other queue is capped at 1GB, so queueB's share is 7GB,
// capped at 3GB.
queue = scheduler.getQueueManager().getLeafQueue(
"queueB", false);
assertEquals("Queue B did not get its expected fair share",
3 * 1024, queue.getFairShare().getMemorySize());
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
// App 1 should be running with 1 container
assertEquals("App 1 is not running with the correct number of containers",
1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
// App 2 should be running with 3 containers
assertEquals("App 2 is not running with the correct number of containers",
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
}
/**
* Test the child max resource settings.
*
* @throws IOException if scheduler reinitialization fails
*/
@Test
public void testChildMaxResources() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\" type=\"parent\">");
out.println(" <maxChildResources>2048mb,2vcores</maxChildResources>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
ApplicationAttemptId attId1 =
createSchedulingRequest(1024, 1, "queueA.queueB", "user1", 8);
ApplicationAttemptId attId2 =
createSchedulingRequest(1024, 1, "queueA.queueC", "user1", 8);
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
// Apps should be running with 2 containers
assertEquals("App 1 is not running with the correct number of containers",
2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
assertEquals("App 2 is not running with the correct number of containers",
2, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\" type=\"parent\">");
out.println(" <maxChildResources>3072mb,3vcores</maxChildResources>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.update();
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
scheduler.handle(nodeEvent);
// Apps should be running with 3 containers now
assertEquals("App 1 is not running with the correct number of containers",
3, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
assertEquals("App 2 is not running with the correct number of containers",
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println(" <queue name=\"queueA\" type=\"parent\">");
out.println(" <maxChildResources>1024mb,1vcores</maxChildResources>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.update();
scheduler.handle(nodeEvent);
// Apps still should be running with 3 containers because we don't preempt
assertEquals("App 1 is not running with the correct number of containers",
3, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
assertEquals("App 2 is not running with the correct number of containers",
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
}
@Test

View File

@ -24,6 +24,7 @@
import java.util.Set;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Test;
@ -33,27 +34,41 @@ public class TestQueueManager {
private FairSchedulerConfiguration conf;
private QueueManager queueManager;
private Set<FSQueue> notEmptyQueues;
private FairScheduler scheduler;
@Before
public void setUp() throws Exception {
conf = new FairSchedulerConfiguration();
FairScheduler scheduler = mock(FairScheduler.class);
scheduler = mock(FairScheduler.class);
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
// Set up some queues to test default child max resource inheritance
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test");
allocConf.setMaxChildResources("root.test",
Resources.createResource(8192, 256));
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA");
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB");
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
when(scheduler.getConf()).thenReturn(conf);
SystemClock clock = SystemClock.getInstance();
when(scheduler.getClock()).thenReturn(clock);
notEmptyQueues = new HashSet<FSQueue>();
notEmptyQueues = new HashSet<>();
queueManager = new QueueManager(scheduler) {
@Override
public boolean isEmpty(FSQueue queue) {
return !notEmptyQueues.contains(queue);
}
};
FSQueueMetrics.forQueue("root", null, true, conf);
queueManager.initialize(conf);
}
@Test
public void testReloadTurnsLeafQueueIntoParent() throws Exception {
updateConfiguredLeafQueues(queueManager, "queue1");
@ -143,4 +158,149 @@ private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLea
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
queueMgr.updateAllocationConfiguration(allocConf);
}
/**
* Test simple leaf queue creation.
*/
@Test
public void testCreateLeafQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
assertNotNull("Leaf queue root.queue1 was not created",
queueManager.getLeafQueue("root.queue1", false));
assertEquals("createQueue() returned wrong queue",
"root.queue1", q1.getName());
}
/**
* Test creation of a leaf queue and its parent.
*/
@Test
public void testCreateLeafQueueAndParent() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
FSQueueType.LEAF);
assertNotNull("Parent queue root.queue1 was not created",
queueManager.getParentQueue("root.queue1", false));
assertNotNull("Leaf queue root.queue1.queue2 was not created",
queueManager.getLeafQueue("root.queue1.queue2", false));
assertEquals("createQueue() returned wrong queue",
"root.queue1.queue2", q2.getName());
}
/**
* Test creation of leaf and parent child queues when the parent queue has
* child defaults set. In this test we rely on the root.test,
* root.test.childA and root.test.childB queues that are created in the
* {@link #setUp()} method.
*/
@Test
public void testCreateQueueWithChildDefaults() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.createQueue("root.test.childC", FSQueueType.LEAF);
assertNotNull("Leaf queue root.test.childC was not created",
queueManager.getLeafQueue("root.test.childC", false));
assertEquals("createQueue() returned wrong queue",
"root.test.childC", q1.getName());
assertEquals("Max resources for root.queue1 were not inherited from "
+ "parent's max child resources", Resources.createResource(8192, 256),
allocConf.getMaxResources("root.test.childC"));
FSQueue q2 = queueManager.createQueue("root.test.childD",
FSQueueType.PARENT);
assertNotNull("Leaf queue root.test.childD was not created",
queueManager.getParentQueue("root.test.childD", false));
assertEquals("createQueue() returned wrong queue",
"root.test.childD", q2.getName());
assertEquals("Max resources for root.test.childD were not inherited "
+ "from parent's max child resources",
Resources.createResource(8192, 256),
allocConf.getMaxResources("root.test.childD"));
// Check that the childA and childB queues weren't impacted
// by the child defaults
assertNotNull("Leaf queue root.test.childA was not created during setup",
queueManager.getLeafQueue("root.test.childA", false));
assertEquals("Max resources for root.test.childA were inherited from "
+ "parent's max child resources", Resources.unbounded(),
allocConf.getMaxResources("root.test.childA"));
assertNotNull("Leaf queue root.test.childB was not created during setup",
queueManager.getParentQueue("root.test.childB", false));
assertEquals("Max resources for root.test.childB were inherited from "
+ "parent's max child resources", Resources.unbounded(),
allocConf.getMaxResources("root.test.childB"));
}
/**
* Test creation of a leaf queue with no resource limits.
*/
@Test
public void testCreateLeafQueueWithDefaults() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
assertNotNull("Leaf queue root.queue1 was not created",
queueManager.getLeafQueue("root.queue1", false));
assertEquals("createQueue() returned wrong queue",
"root.queue1", q1.getName());
// Min default is 0,0
assertEquals("Min resources were not set to default",
Resources.createResource(0, 0),
allocConf.getMinResources("root.queue1"));
// Max default is unbounded
assertEquals("Max resources were not set to default", Resources.unbounded(),
allocConf.getMaxResources("root.queue1"));
}
/**
* Test creation of a simple parent queue.
*/
@Test
public void testCreateParentQueue() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.PARENT);
assertNotNull("Parent queue root.queue1 was not created",
queueManager.getParentQueue("root.queue1", false));
assertEquals("createQueue() returned wrong queue",
"root.queue1", q1.getName());
}
/**
* Test creation of a parent queue and its parent.
*/
@Test
public void testCreateParentQueueAndParent() {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
queueManager.updateAllocationConfiguration(allocConf);
FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
FSQueueType.PARENT);
assertNotNull("Parent queue root.queue1 was not created",
queueManager.getParentQueue("root.queue1", false));
assertNotNull("Leaf queue root.queue1.queue2 was not created",
queueManager.getParentQueue("root.queue1.queue2", false));
assertEquals("createQueue() returned wrong queue",
"root.queue1.queue2", q2.getName());
}
}

View File

@ -97,6 +97,8 @@ The allocation file must be in XML format. The format contains five types of ele
* maxResources: maximum resources a queue is allowed, in the form "X mb, Y vcores". A queue will never be assigned a container that would put its aggregate usage over this limit.
* maxChildResources: maximum resources an ad hoc child queue is allowed, in the form "X mb, Y vcores". Any ad hoc queue that is a direct child of a queue with this property set will have it's maxResources property set accordingly.
* maxRunningApps: limit the number of apps from the queue to run at once
* maxAMShare: limit the fraction of the queue's fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f.
@ -174,6 +176,7 @@ The allocation file must be in XML format. The format contains five types of ele
user queues under it -->
<queue name="secondary_group_queue" type="parent">
<weight>3.0</weight>
<maxChildResources>4096 mb,4vcores</maxChildResources>
</queue>
<user name="sample_user">