YARN-4702. FairScheduler: Allow setting maxResources for ad hoc queues. (Daniel Templeton via kasha)
This commit is contained in:
parent
ca13e7971d
commit
20f0eb871c
|
@ -46,6 +46,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
// Maximum amount of resources per queue
|
||||
@VisibleForTesting
|
||||
final Map<String, Resource> maxQueueResources;
|
||||
// Maximum amount of resources for each queue's ad hoc children
|
||||
private final Map<String, Resource> maxChildQueueResources;
|
||||
// Sharing weights for each queue
|
||||
private final Map<String, ResourceWeights> queueWeights;
|
||||
|
||||
|
@ -107,6 +109,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
|
||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Resource> maxChildQueueResources,
|
||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||
Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, Float> queueMaxAMShares, int userMaxAppsDefault,
|
||||
|
@ -126,6 +129,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
Set<String> nonPreemptableQueues) {
|
||||
this.minQueueResources = minQueueResources;
|
||||
this.maxQueueResources = maxQueueResources;
|
||||
this.maxChildQueueResources = maxChildQueueResources;
|
||||
this.queueMaxApps = queueMaxApps;
|
||||
this.userMaxApps = userMaxApps;
|
||||
this.queueMaxAMShares = queueMaxAMShares;
|
||||
|
@ -149,31 +153,32 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
}
|
||||
|
||||
public AllocationConfiguration(Configuration conf) {
|
||||
minQueueResources = new HashMap<String, Resource>();
|
||||
maxQueueResources = new HashMap<String, Resource>();
|
||||
queueWeights = new HashMap<String, ResourceWeights>();
|
||||
queueMaxApps = new HashMap<String, Integer>();
|
||||
userMaxApps = new HashMap<String, Integer>();
|
||||
queueMaxAMShares = new HashMap<String, Float>();
|
||||
minQueueResources = new HashMap<>();
|
||||
maxChildQueueResources = new HashMap<>();
|
||||
maxQueueResources = new HashMap<>();
|
||||
queueWeights = new HashMap<>();
|
||||
queueMaxApps = new HashMap<>();
|
||||
userMaxApps = new HashMap<>();
|
||||
queueMaxAMShares = new HashMap<>();
|
||||
userMaxAppsDefault = Integer.MAX_VALUE;
|
||||
queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||
queueMaxResourcesDefault = Resources.unbounded();
|
||||
queueMaxAMShareDefault = 0.5f;
|
||||
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||
resAcls = new HashMap<String, Map<ReservationACL, AccessControlList>>();
|
||||
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
fairSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
fairSharePreemptionThresholds = new HashMap<String, Float>();
|
||||
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
|
||||
queueAcls = new HashMap<>();
|
||||
resAcls = new HashMap<>();
|
||||
minSharePreemptionTimeouts = new HashMap<>();
|
||||
fairSharePreemptionTimeouts = new HashMap<>();
|
||||
fairSharePreemptionThresholds = new HashMap<>();
|
||||
schedulingPolicies = new HashMap<>();
|
||||
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
|
||||
reservableQueues = new HashSet<>();
|
||||
configuredQueues = new HashMap<FSQueueType, Set<String>>();
|
||||
configuredQueues = new HashMap<>();
|
||||
for (FSQueueType queueType : FSQueueType.values()) {
|
||||
configuredQueues.put(queueType, new HashSet<String>());
|
||||
configuredQueues.put(queueType, new HashSet<>());
|
||||
}
|
||||
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
|
||||
configuredQueues);
|
||||
nonPreemptableQueues = new HashSet<String>();
|
||||
placementPolicy =
|
||||
QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
|
||||
nonPreemptableQueues = new HashSet<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -263,7 +268,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
|
||||
/**
|
||||
* Get the minimum resource allocation for the given queue.
|
||||
* @return the cap set on this queue, or 0 if not set.
|
||||
*
|
||||
* @param queue the target queue's name
|
||||
* @return the min allocation on this queue or {@link Resources#none}
|
||||
* if not set
|
||||
*/
|
||||
public Resource getMinResources(String queue) {
|
||||
Resource minQueueResource = minQueueResources.get(queue);
|
||||
|
@ -271,14 +279,26 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the maximum resource allocation for the given queue.
|
||||
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
|
||||
* Set the maximum resource allocation for the given queue.
|
||||
*
|
||||
* @param queue the target queue
|
||||
* @param maxResource the maximum resource allocation
|
||||
*/
|
||||
void setMaxResources(String queue, Resource maxResource) {
|
||||
maxQueueResources.put(queue, maxResource);
|
||||
}
|
||||
|
||||
public Resource getMaxResources(String queueName) {
|
||||
Resource maxQueueResource = maxQueueResources.get(queueName);
|
||||
/**
|
||||
* Get the maximum resource allocation for the given queue. If the max in not
|
||||
* set, return the larger of the min and the default max.
|
||||
*
|
||||
* @param queue the target queue's name
|
||||
* @return the max allocation on this queue
|
||||
*/
|
||||
public Resource getMaxResources(String queue) {
|
||||
Resource maxQueueResource = maxQueueResources.get(queue);
|
||||
if (maxQueueResource == null) {
|
||||
Resource minQueueResource = minQueueResources.get(queueName);
|
||||
Resource minQueueResource = minQueueResources.get(queue);
|
||||
if (minQueueResource != null &&
|
||||
Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(),
|
||||
minQueueResource, queueMaxResourcesDefault)) {
|
||||
|
@ -291,6 +311,27 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum resource allocation for children of the given queue.
|
||||
*
|
||||
* @param queue the target queue's name
|
||||
* @return the max allocation on this queue or null if not set
|
||||
*/
|
||||
public Resource getMaxChildResources(String queue) {
|
||||
return maxChildQueueResources.get(queue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the maximum resource allocation for the children of the given queue.
|
||||
* Use of this method is primarily intended for testing purposes.
|
||||
*
|
||||
* @param queue the target queue
|
||||
* @param maxResource the maximum resource allocation
|
||||
*/
|
||||
void setMaxChildResources(String queue, Resource maxResource) {
|
||||
maxChildQueueResources.put(queue, maxResource);
|
||||
}
|
||||
|
||||
public boolean hasAccess(String queueName, QueueACL acl,
|
||||
UserGroupInformation user) {
|
||||
int lastPeriodIndex = queueName.length();
|
||||
|
|
|
@ -213,23 +213,22 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
LOG.info("Loading allocation file " + allocFile);
|
||||
// Create some temporary hashmaps to hold the new allocs, and we only save
|
||||
// them in our fields if we have parsed the entire allocs file successfully.
|
||||
Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
|
||||
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
|
||||
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
||||
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
||||
Map<String, Float> queueMaxAMShares = new HashMap<String, Float>();
|
||||
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
|
||||
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
|
||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
|
||||
Map<String, Float> fairSharePreemptionThresholds =
|
||||
new HashMap<String, Float>();
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||
Map<String, Resource> minQueueResources = new HashMap<>();
|
||||
Map<String, Resource> maxQueueResources = new HashMap<>();
|
||||
Map<String, Resource> maxChildQueueResources = new HashMap<>();
|
||||
Map<String, Integer> queueMaxApps = new HashMap<>();
|
||||
Map<String, Integer> userMaxApps = new HashMap<>();
|
||||
Map<String, Float> queueMaxAMShares = new HashMap<>();
|
||||
Map<String, ResourceWeights> queueWeights = new HashMap<>();
|
||||
Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
|
||||
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
|
||||
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
|
||||
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
|
||||
Map<String, Map<ReservationACL, AccessControlList>> reservationAcls =
|
||||
new HashMap<String, Map<ReservationACL, AccessControlList>>();
|
||||
Set<String> reservableQueues = new HashSet<String>();
|
||||
Set<String> nonPreemptableQueues = new HashSet<String>();
|
||||
new HashMap<>();
|
||||
Set<String> reservableQueues = new HashSet<>();
|
||||
Set<String> nonPreemptableQueues = new HashSet<>();
|
||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||
Resource queueMaxResourcesDefault = Resources.unbounded();
|
||||
|
@ -250,8 +249,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
// configuredQueues is segregated based on whether it is a leaf queue
|
||||
// or a parent queue. This information is used for creating queues
|
||||
// and also for making queue placement decisions(QueuePlacementRule.java).
|
||||
Map<FSQueueType, Set<String>> configuredQueues =
|
||||
new HashMap<FSQueueType, Set<String>>();
|
||||
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
|
||||
|
||||
for (FSQueueType queueType : FSQueueType.values()) {
|
||||
configuredQueues.put(queueType, new HashSet<String>());
|
||||
}
|
||||
|
@ -368,10 +367,11 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
parent = null;
|
||||
}
|
||||
loadQueue(parent, element, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
||||
fairSharePreemptionThresholds, queueAcls, reservationAcls,
|
||||
configuredQueues, reservableQueues, nonPreemptableQueues);
|
||||
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
|
||||
queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||
reservationAcls, configuredQueues, reservableQueues,
|
||||
nonPreemptableQueues);
|
||||
}
|
||||
|
||||
// Load placement policy and pass it configured queues
|
||||
|
@ -413,14 +413,15 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
globalReservationQueueConfig.setReservationAgent(reservationAgent);
|
||||
}
|
||||
|
||||
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
|
||||
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
|
||||
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
|
||||
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
|
||||
defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||
reservationAcls, newPlacementPolicy, configuredQueues,
|
||||
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
|
||||
AllocationConfiguration info =
|
||||
new AllocationConfiguration(minQueueResources, maxQueueResources,
|
||||
maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights,
|
||||
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
|
||||
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
|
||||
defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||
reservationAcls, newPlacementPolicy, configuredQueues,
|
||||
globalReservationQueueConfig, reservableQueues, nonPreemptableQueues);
|
||||
|
||||
lastSuccessfulReload = clock.getTime();
|
||||
lastReloadAttemptFailed = false;
|
||||
|
@ -433,8 +434,11 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
*/
|
||||
private void loadQueue(String parentName, Element element,
|
||||
Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
|
||||
Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Resource> maxChildQueueResources,
|
||||
Map<String, Integer> queueMaxApps,
|
||||
Map<String, Integer> userMaxApps,
|
||||
Map<String, Float> queueMaxAMShares,
|
||||
Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, SchedulingPolicy> queuePolicies,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
|
@ -463,8 +467,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
if (parentName != null) {
|
||||
queueName = parentName + "." + queueName;
|
||||
}
|
||||
Map<QueueACL, AccessControlList> acls =
|
||||
new HashMap<QueueACL, AccessControlList>();
|
||||
|
||||
Map<QueueACL, AccessControlList> acls = new HashMap<>();
|
||||
Map<ReservationACL, AccessControlList> racls = new HashMap<>();
|
||||
NodeList fields = element.getChildNodes();
|
||||
boolean isLeaf = true;
|
||||
|
@ -476,12 +480,19 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Element field = (Element) fieldNode;
|
||||
if ("minResources".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
Resource val =
|
||||
FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
minQueueResources.put(queueName, val);
|
||||
} else if ("maxResources".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
Resource val =
|
||||
FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
maxQueueResources.put(queueName, val);
|
||||
} else if ("maxChildResources".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
Resource val =
|
||||
FairSchedulerConfiguration.parseResourceConfigValue(text);
|
||||
maxChildQueueResources.put(queueName, val);
|
||||
} else if ("maxRunningApps".equals(field.getTagName())) {
|
||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||
int val = Integer.parseInt(text);
|
||||
|
@ -543,8 +554,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
} else if ("queue".endsWith(field.getTagName()) ||
|
||||
"pool".equals(field.getTagName())) {
|
||||
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||
queuePolicies, minSharePreemptionTimeouts,
|
||||
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
|
||||
queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
|
||||
queueAcls, resAcls, configuredQueues, reservableQueues,
|
||||
nonPreemptableQueues);
|
||||
|
@ -574,9 +585,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
&& !Resources.fitsIn(minQueueResources.get(queueName),
|
||||
maxQueueResources.get(queueName))) {
|
||||
LOG.warn(
|
||||
String.format(
|
||||
"Queue %s has max resources %s less than min resources %s",
|
||||
queueName, maxQueueResources.get(queueName),
|
||||
String.format("Queue %s has max resources %s less than "
|
||||
+ "min resources %s", queueName, maxQueueResources.get(queueName),
|
||||
minQueueResources.get(queueName)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
|
@ -1573,11 +1574,41 @@ public class FairScheduler extends
|
|||
allocConf = queueInfo;
|
||||
allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
|
||||
queueMgr.updateAllocationConfiguration(allocConf);
|
||||
applyChildDefaults();
|
||||
maxRunningEnforcer.updateRunnabilityOnReload();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* After reloading the allocation config, the max resource settings for any
|
||||
* ad hoc queues will be missing. This method goes through the queue manager's
|
||||
* queue list and adds back the max resources settings for any ad hoc queues.
|
||||
* Note that the new max resource settings will be based on the new config.
|
||||
* The old settings are lost.
|
||||
*/
|
||||
private void applyChildDefaults() {
|
||||
Collection<FSQueue> queues = queueMgr.getQueues();
|
||||
Set<String> configuredLeafQueues =
|
||||
allocConf.getConfiguredQueues().get(FSQueueType.LEAF);
|
||||
Set<String> configuredParentQueues =
|
||||
allocConf.getConfiguredQueues().get(FSQueueType.PARENT);
|
||||
|
||||
for (FSQueue queue : queues) {
|
||||
// If the queue is ad hoc and not root, apply the child defaults
|
||||
if ((queue.getParent() != null) &&
|
||||
!configuredLeafQueues.contains(queue.getName()) &&
|
||||
!configuredParentQueues.contains(queue.getName())) {
|
||||
Resource max =
|
||||
allocConf.getMaxChildResources(queue.getParent().getName());
|
||||
|
||||
if (max != null) {
|
||||
allocConf.setMaxResources(queue.getName(), max);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
||||
FSQueue queue = queueMgr.getQueue(queueName);
|
||||
|
|
|
@ -39,6 +39,9 @@ import org.xml.sax.SAXException;
|
|||
|
||||
import com.google.common.base.CharMatcher;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.util.Iterator;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
/**
|
||||
* Maintains a list of queues as well as scheduling parameters for each queue,
|
||||
* such as guaranteed share allocations, from the fair scheduler config file.
|
||||
|
@ -173,15 +176,42 @@ public class QueueManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a leaf or parent queue based on what is specified in 'queueType'
|
||||
* and places it in the tree. Creates any parents that don't already exist.
|
||||
* Create a leaf or parent queue based on what is specified in
|
||||
* {@code queueType} and place it in the tree. Create any parents that don't
|
||||
* already exist.
|
||||
*
|
||||
* @return
|
||||
* the created queue, if successful. null if not allowed (one of the parent
|
||||
* queues in the queue name is already a leaf queue)
|
||||
* @return the created queue, if successful or null if not allowed (one of the
|
||||
* parent queues in the queue name is already a leaf queue)
|
||||
*/
|
||||
private FSQueue createQueue(String name, FSQueueType queueType) {
|
||||
List<String> newQueueNames = new ArrayList<String>();
|
||||
@VisibleForTesting
|
||||
FSQueue createQueue(String name, FSQueueType queueType) {
|
||||
List<String> newQueueNames = new ArrayList<>();
|
||||
FSParentQueue parent = buildNewQueueList(name, newQueueNames);
|
||||
FSQueue queue = null;
|
||||
|
||||
if (parent != null) {
|
||||
// Now that we know everything worked out, make all the queues
|
||||
// and add them to the map.
|
||||
queue = createNewQueues(queueType, parent, newQueueNames);
|
||||
}
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Compile a list of all parent queues of the given queue name that do not
|
||||
* already exist. The queue names will be added to the {@code newQueueNames}
|
||||
* list. The list will be in order of increasing queue depth. The first
|
||||
* element of the list will be the parent closest to the root. The last
|
||||
* element added will be the queue to be created. This method returns the
|
||||
* deepest parent that does exist.
|
||||
*
|
||||
* @param name the fully qualified name of the queue to create
|
||||
* @param newQueueNames the list to which to add non-existent queues
|
||||
* @return the deepest existing parent queue
|
||||
*/
|
||||
private FSParentQueue buildNewQueueList(String name,
|
||||
List<String> newQueueNames) {
|
||||
newQueueNames.add(name);
|
||||
int sepIndex = name.length();
|
||||
FSParentQueue parent = null;
|
||||
|
@ -195,62 +225,120 @@ public class QueueManager {
|
|||
throw new InvalidQueueNameException("Illegal node name at offset " +
|
||||
(sepIndex+1) + " for queue name " + name);
|
||||
}
|
||||
FSQueue queue;
|
||||
String curName = null;
|
||||
curName = name.substring(0, sepIndex);
|
||||
queue = queues.get(curName);
|
||||
|
||||
String curName = name.substring(0, sepIndex);
|
||||
FSQueue queue = queues.get(curName);
|
||||
|
||||
if (queue == null) {
|
||||
newQueueNames.add(curName);
|
||||
newQueueNames.add(0, curName);
|
||||
} else {
|
||||
if (queue instanceof FSParentQueue) {
|
||||
parent = (FSParentQueue)queue;
|
||||
break;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, parent refers to the deepest existing parent of the
|
||||
// queue to create.
|
||||
// Now that we know everything worked out, make all the queues
|
||||
// and add them to the map.
|
||||
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
|
||||
FSLeafQueue leafQueue = null;
|
||||
for (int i = newQueueNames.size()-1; i >= 0; i--) {
|
||||
String queueName = newQueueNames.get(i);
|
||||
if (i == 0 && queueType != FSQueueType.PARENT) {
|
||||
leafQueue = new FSLeafQueue(name, scheduler, parent);
|
||||
try {
|
||||
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
|
||||
} catch (AllocationConfigurationException ex) {
|
||||
LOG.warn("Failed to set default scheduling policy "
|
||||
+ queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
|
||||
}
|
||||
parent.addChildQueue(leafQueue);
|
||||
queues.put(leafQueue.getName(), leafQueue);
|
||||
leafQueues.add(leafQueue);
|
||||
leafQueue.updatePreemptionVariables();
|
||||
return leafQueue;
|
||||
} else {
|
||||
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
|
||||
try {
|
||||
newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
|
||||
} catch (AllocationConfigurationException ex) {
|
||||
LOG.warn("Failed to set default scheduling policy "
|
||||
+ queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
|
||||
}
|
||||
parent.addChildQueue(newParent);
|
||||
queues.put(newParent.getName(), newParent);
|
||||
newParent.updatePreemptionVariables();
|
||||
parent = newParent;
|
||||
// If the queue isn't a parent queue, parent will still be null when
|
||||
// we break
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return parent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create all queues in the {@code newQueueNames} list. The list must be in
|
||||
* order of increasing depth. All but the last element in the list will be
|
||||
* created as parent queues. The last element will be created as the type
|
||||
* specified by the {@code queueType} parameter. The first queue will be
|
||||
* created as a child of the {@code topParent} queue. All subsequent queues
|
||||
* will be created as a child of the previously created queue.
|
||||
*
|
||||
* @param queueType the type of the last queue to create
|
||||
* @param topParent the parent of the first queue to create
|
||||
* @param newQueueNames the list of queues to create
|
||||
* @return the last queue created
|
||||
*/
|
||||
private FSQueue createNewQueues(FSQueueType queueType,
|
||||
FSParentQueue topParent, List<String> newQueueNames) {
|
||||
AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
|
||||
Iterator<String> i = newQueueNames.iterator();
|
||||
FSParentQueue parent = topParent;
|
||||
FSQueue queue = null;
|
||||
|
||||
while (i.hasNext()) {
|
||||
FSParentQueue newParent = null;
|
||||
String queueName = i.next();
|
||||
|
||||
// Only create a leaf queue at the very end
|
||||
if (!i.hasNext() && (queueType != FSQueueType.PARENT)) {
|
||||
FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent);
|
||||
|
||||
try {
|
||||
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
|
||||
} catch (AllocationConfigurationException ex) {
|
||||
LOG.warn("Failed to set default scheduling policy "
|
||||
+ queueConf.getDefaultSchedulingPolicy()
|
||||
+ " on new leaf queue.", ex);
|
||||
}
|
||||
|
||||
leafQueues.add(leafQueue);
|
||||
queue = leafQueue;
|
||||
} else {
|
||||
newParent = new FSParentQueue(queueName, scheduler, parent);
|
||||
|
||||
try {
|
||||
newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
|
||||
} catch (AllocationConfigurationException ex) {
|
||||
LOG.warn("Failed to set default scheduling policy "
|
||||
+ queueConf.getDefaultSchedulingPolicy()
|
||||
+ " on new parent queue.", ex);
|
||||
}
|
||||
|
||||
queue = newParent;
|
||||
}
|
||||
|
||||
parent.addChildQueue(queue);
|
||||
setChildResourceLimits(parent, queue, queueConf);
|
||||
queues.put(queue.getName(), queue);
|
||||
queue.updatePreemptionVariables();
|
||||
|
||||
// If we just created a leaf node, the newParent is null, but that's OK
|
||||
// because we only create a leaf node in the very last iteration.
|
||||
parent = newParent;
|
||||
}
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* For the given child queue, set the max resources based on the
|
||||
* parent queue's default child resource settings. This method assumes that
|
||||
* the child queue is ad hoc and hence does not do any safety checks around
|
||||
* overwriting existing max resource settings.
|
||||
*
|
||||
* @param parent the parent queue
|
||||
* @param child the child queue
|
||||
* @param queueConf the {@link AllocationConfiguration}
|
||||
*/
|
||||
void setChildResourceLimits(FSParentQueue parent, FSQueue child,
|
||||
AllocationConfiguration queueConf) {
|
||||
Map<FSQueueType, Set<String>> configuredQueues =
|
||||
queueConf.getConfiguredQueues();
|
||||
|
||||
// Ad hoc queues do not exist in the configured queues map
|
||||
if (!configuredQueues.get(FSQueueType.LEAF).contains(child.getName()) &&
|
||||
!configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) {
|
||||
// For ad hoc queues, set their max reource allocations based on
|
||||
// their parents' default child settings.
|
||||
Resource maxChild = queueConf.getMaxChildResources(parent.getName());
|
||||
|
||||
if (maxChild != null) {
|
||||
queueConf.setMaxResources(child.getName(), maxChild);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make way for the given queue if possible, by removing incompatible
|
||||
* queues with no apps in them. Incompatibility could be due to
|
||||
|
|
|
@ -179,12 +179,16 @@ public class TestAllocationFileLoaderService {
|
|||
out.println("<queue name=\"queueE\">");
|
||||
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
|
||||
out.println("</queue>");
|
||||
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
|
||||
// Make queue F a parent queue without configured leaf queues using the
|
||||
// 'type' attribute
|
||||
out.println("<queue name=\"queueF\" type=\"parent\" >");
|
||||
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
|
||||
out.println("</queue>");
|
||||
// Create hierarchical queues G,H, with different min/fair share preemption
|
||||
// timeouts and preemption thresholds
|
||||
// timeouts and preemption thresholds. Also add a child default to make sure
|
||||
// it doesn't impact queue H.
|
||||
out.println("<queue name=\"queueG\">");
|
||||
out.println("<maxChildResources>2048mb,64vcores</maxChildResources>");
|
||||
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
|
||||
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
|
||||
out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
|
||||
|
@ -240,6 +244,12 @@ public class TestAllocationFileLoaderService {
|
|||
queueConf.getMaxResources("root.queueD"));
|
||||
assertEquals(Resources.createResource(4096, 100),
|
||||
queueConf.getMaxResources("root.queueE"));
|
||||
assertEquals(Resources.createResource(4096, 100),
|
||||
queueConf.getMaxResources("root.queueF"));
|
||||
assertEquals(Resources.createResource(4096, 100),
|
||||
queueConf.getMaxResources("root.queueG"));
|
||||
assertEquals(Resources.createResource(4096, 100),
|
||||
queueConf.getMaxResources("root.queueG.queueH"));
|
||||
|
||||
assertEquals(Resources.createResource(1024, 0),
|
||||
queueConf.getMinResources("root.queueA"));
|
||||
|
@ -251,8 +261,33 @@ public class TestAllocationFileLoaderService {
|
|||
queueConf.getMinResources("root.queueD"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root.queueE"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root.queueF"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root.queueG"));
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root.queueG.queueH"));
|
||||
|
||||
assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
assertNull("Max child resources unexpectedly set for queue root.queueA",
|
||||
queueConf.getMaxChildResources("root.queueA"));
|
||||
assertNull("Max child resources unexpectedly set for queue root.queueB",
|
||||
queueConf.getMaxChildResources("root.queueB"));
|
||||
assertNull("Max child resources unexpectedly set for queue root.queueC",
|
||||
queueConf.getMaxChildResources("root.queueC"));
|
||||
assertNull("Max child resources unexpectedly set for queue root.queueD",
|
||||
queueConf.getMaxChildResources("root.queueD"));
|
||||
assertNull("Max child resources unexpectedly set for queue root.queueE",
|
||||
queueConf.getMaxChildResources("root.queueE"));
|
||||
assertEquals(Resources.createResource(2048, 64),
|
||||
queueConf.getMaxChildResources("root.queueF"));
|
||||
assertEquals(Resources.createResource(2048, 64),
|
||||
queueConf.getMaxChildResources("root.queueG"));
|
||||
assertNull("Max child resources unexpectedly set for "
|
||||
+ "queue root.queueG.queueH",
|
||||
queueConf.getMaxChildResources("root.queueG.queueH"));
|
||||
|
||||
assertEquals(15, queueConf.getQueueMaxApps("root."
|
||||
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
assertEquals(15, queueConf.getQueueMaxApps("root.queueA"));
|
||||
assertEquals(15, queueConf.getQueueMaxApps("root.queueB"));
|
||||
assertEquals(15, queueConf.getQueueMaxApps("root.queueC"));
|
||||
|
|
|
@ -139,8 +139,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
conf = createConfiguration();
|
||||
resourceManager = new MockRM(conf);
|
||||
|
||||
// TODO: This test should really be using MockRM. For now starting stuff
|
||||
// that is needed at a bare minimum.
|
||||
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
|
||||
resourceManager.getRMContext().getStateStore().start();
|
||||
|
||||
|
@ -335,8 +333,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fair shares when max resources are set but are too high to impact
|
||||
* the shares.
|
||||
*
|
||||
* @throws IOException if scheduler reinitialization fails
|
||||
*/
|
||||
@Test
|
||||
public void testFairShareWithMaxResources() throws IOException {
|
||||
public void testFairShareWithHighMaxResources() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
// set queueA and queueB maxResources,
|
||||
// the sum of queueA and queueB maxResources is more than
|
||||
|
@ -376,11 +380,184 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueA", false);
|
||||
// queueA's weight is 0.25, so its fair share should be 2 * 1024.
|
||||
assertEquals(2 * 1024, queue.getFairShare().getMemorySize());
|
||||
assertEquals("Queue A did not get its expected fair share",
|
||||
2 * 1024, queue.getFairShare().getMemorySize());
|
||||
// queueB's weight is 0.75, so its fair share should be 6 * 1024.
|
||||
queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueB", false);
|
||||
assertEquals(6 * 1024, queue.getFairShare().getMemorySize());
|
||||
assertEquals("Queue B did not get its expected fair share",
|
||||
6 * 1024, queue.getFairShare().getMemorySize());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test fair shares when max resources are set and are low enough to impact
|
||||
* the shares.
|
||||
*
|
||||
* @throws IOException if scheduler reinitialization fails
|
||||
*/
|
||||
@Test
|
||||
public void testFairShareWithLowMaxResources() throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <queue name=\"queueA\">");
|
||||
out.println(" <maxResources>1024 mb 1 vcores</maxResources>");
|
||||
out.println(" <weight>0.75</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println(" <queue name=\"queueB\">");
|
||||
out.println(" <maxResources>3072 mb 3 vcores</maxResources>");
|
||||
out.println(" <weight>0.25</weight>");
|
||||
out.println(" </queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
ApplicationAttemptId attId1 =
|
||||
createSchedulingRequest(1024, 1, "queueA", "user1", 2);
|
||||
ApplicationAttemptId attId2 =
|
||||
createSchedulingRequest(1024, 1, "queueB", "user1", 4);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
FSLeafQueue queue =
|
||||
scheduler.getQueueManager().getLeafQueue("queueA", false);
|
||||
// queueA's weight is 0.5, so its fair share should be 6GB, but it's
|
||||
// capped at 1GB.
|
||||
assertEquals("Queue A did not get its expected fair share",
|
||||
1 * 1024, queue.getFairShare().getMemorySize());
|
||||
// queueB's weight is 0.5, so its fair share should be 2GB, but the
|
||||
// other queue is capped at 1GB, so queueB's share is 7GB,
|
||||
// capped at 3GB.
|
||||
queue = scheduler.getQueueManager().getLeafQueue(
|
||||
"queueB", false);
|
||||
assertEquals("Queue B did not get its expected fair share",
|
||||
3 * 1024, queue.getFairShare().getMemorySize());
|
||||
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(updateEvent);
|
||||
scheduler.handle(updateEvent);
|
||||
scheduler.handle(updateEvent);
|
||||
scheduler.handle(updateEvent);
|
||||
scheduler.handle(updateEvent);
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// App 1 should be running with 1 container
|
||||
assertEquals("App 1 is not running with the correct number of containers",
|
||||
1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
||||
// App 2 should be running with 3 containers
|
||||
assertEquals("App 2 is not running with the correct number of containers",
|
||||
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the child max resource settings.
|
||||
*
|
||||
* @throws IOException if scheduler reinitialization fails
|
||||
*/
|
||||
@Test
|
||||
public void testChildMaxResources() throws IOException {
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <queue name=\"queueA\" type=\"parent\">");
|
||||
out.println(" <maxChildResources>2048mb,2vcores</maxChildResources>");
|
||||
out.println(" </queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1,
|
||||
"127.0.0.1");
|
||||
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
ApplicationAttemptId attId1 =
|
||||
createSchedulingRequest(1024, 1, "queueA.queueB", "user1", 8);
|
||||
ApplicationAttemptId attId2 =
|
||||
createSchedulingRequest(1024, 1, "queueA.queueC", "user1", 8);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
// Apps should be running with 2 containers
|
||||
assertEquals("App 1 is not running with the correct number of containers",
|
||||
2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
||||
assertEquals("App 2 is not running with the correct number of containers",
|
||||
2, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
||||
|
||||
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <queue name=\"queueA\" type=\"parent\">");
|
||||
out.println(" <maxChildResources>3072mb,3vcores</maxChildResources>");
|
||||
out.println(" </queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
scheduler.update();
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
// Apps should be running with 3 containers now
|
||||
assertEquals("App 1 is not running with the correct number of containers",
|
||||
3, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
||||
assertEquals("App 2 is not running with the correct number of containers",
|
||||
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
||||
|
||||
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println(" <queue name=\"queueA\" type=\"parent\">");
|
||||
out.println(" <maxChildResources>1024mb,1vcores</maxChildResources>");
|
||||
out.println(" </queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
scheduler.update();
|
||||
scheduler.handle(nodeEvent);
|
||||
|
||||
// Apps still should be running with 3 containers because we don't preempt
|
||||
assertEquals("App 1 is not running with the correct number of containers",
|
||||
3, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
|
||||
assertEquals("App 2 is not running with the correct number of containers",
|
||||
3, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.HashSet;
|
|||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -33,24 +34,38 @@ public class TestQueueManager {
|
|||
private FairSchedulerConfiguration conf;
|
||||
private QueueManager queueManager;
|
||||
private Set<FSQueue> notEmptyQueues;
|
||||
private FairScheduler scheduler;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new FairSchedulerConfiguration();
|
||||
FairScheduler scheduler = mock(FairScheduler.class);
|
||||
scheduler = mock(FairScheduler.class);
|
||||
|
||||
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||
|
||||
// Set up some queues to test default child max resource inheritance
|
||||
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test");
|
||||
allocConf.setMaxChildResources("root.test",
|
||||
Resources.createResource(8192, 256));
|
||||
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA");
|
||||
allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB");
|
||||
|
||||
when(scheduler.getAllocationConfiguration()).thenReturn(allocConf);
|
||||
when(scheduler.getConf()).thenReturn(conf);
|
||||
|
||||
SystemClock clock = SystemClock.getInstance();
|
||||
|
||||
when(scheduler.getClock()).thenReturn(clock);
|
||||
notEmptyQueues = new HashSet<FSQueue>();
|
||||
notEmptyQueues = new HashSet<>();
|
||||
queueManager = new QueueManager(scheduler) {
|
||||
@Override
|
||||
public boolean isEmpty(FSQueue queue) {
|
||||
return !notEmptyQueues.contains(queue);
|
||||
}
|
||||
};
|
||||
|
||||
FSQueueMetrics.forQueue("root", null, true, conf);
|
||||
|
||||
queueManager.initialize(conf);
|
||||
}
|
||||
|
||||
|
@ -143,4 +158,149 @@ public class TestQueueManager {
|
|||
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
|
||||
queueMgr.updateAllocationConfiguration(allocConf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test simple leaf queue creation.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateLeafQueue() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
|
||||
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
|
||||
|
||||
assertNotNull("Leaf queue root.queue1 was not created",
|
||||
queueManager.getLeafQueue("root.queue1", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.queue1", q1.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of a leaf queue and its parent.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateLeafQueueAndParent() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
|
||||
FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
|
||||
FSQueueType.LEAF);
|
||||
|
||||
assertNotNull("Parent queue root.queue1 was not created",
|
||||
queueManager.getParentQueue("root.queue1", false));
|
||||
assertNotNull("Leaf queue root.queue1.queue2 was not created",
|
||||
queueManager.getLeafQueue("root.queue1.queue2", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.queue1.queue2", q2.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of leaf and parent child queues when the parent queue has
|
||||
* child defaults set. In this test we rely on the root.test,
|
||||
* root.test.childA and root.test.childB queues that are created in the
|
||||
* {@link #setUp()} method.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateQueueWithChildDefaults() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
|
||||
FSQueue q1 = queueManager.createQueue("root.test.childC", FSQueueType.LEAF);
|
||||
|
||||
assertNotNull("Leaf queue root.test.childC was not created",
|
||||
queueManager.getLeafQueue("root.test.childC", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.test.childC", q1.getName());
|
||||
assertEquals("Max resources for root.queue1 were not inherited from "
|
||||
+ "parent's max child resources", Resources.createResource(8192, 256),
|
||||
allocConf.getMaxResources("root.test.childC"));
|
||||
|
||||
FSQueue q2 = queueManager.createQueue("root.test.childD",
|
||||
FSQueueType.PARENT);
|
||||
|
||||
assertNotNull("Leaf queue root.test.childD was not created",
|
||||
queueManager.getParentQueue("root.test.childD", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.test.childD", q2.getName());
|
||||
assertEquals("Max resources for root.test.childD were not inherited "
|
||||
+ "from parent's max child resources",
|
||||
Resources.createResource(8192, 256),
|
||||
allocConf.getMaxResources("root.test.childD"));
|
||||
|
||||
// Check that the childA and childB queues weren't impacted
|
||||
// by the child defaults
|
||||
assertNotNull("Leaf queue root.test.childA was not created during setup",
|
||||
queueManager.getLeafQueue("root.test.childA", false));
|
||||
assertEquals("Max resources for root.test.childA were inherited from "
|
||||
+ "parent's max child resources", Resources.unbounded(),
|
||||
allocConf.getMaxResources("root.test.childA"));
|
||||
assertNotNull("Leaf queue root.test.childB was not created during setup",
|
||||
queueManager.getParentQueue("root.test.childB", false));
|
||||
assertEquals("Max resources for root.test.childB were inherited from "
|
||||
+ "parent's max child resources", Resources.unbounded(),
|
||||
allocConf.getMaxResources("root.test.childB"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of a leaf queue with no resource limits.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateLeafQueueWithDefaults() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
|
||||
|
||||
assertNotNull("Leaf queue root.queue1 was not created",
|
||||
queueManager.getLeafQueue("root.queue1", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.queue1", q1.getName());
|
||||
|
||||
// Min default is 0,0
|
||||
assertEquals("Min resources were not set to default",
|
||||
Resources.createResource(0, 0),
|
||||
allocConf.getMinResources("root.queue1"));
|
||||
|
||||
// Max default is unbounded
|
||||
assertEquals("Max resources were not set to default", Resources.unbounded(),
|
||||
allocConf.getMaxResources("root.queue1"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of a simple parent queue.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateParentQueue() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
|
||||
FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.PARENT);
|
||||
|
||||
assertNotNull("Parent queue root.queue1 was not created",
|
||||
queueManager.getParentQueue("root.queue1", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.queue1", q1.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test creation of a parent queue and its parent.
|
||||
*/
|
||||
@Test
|
||||
public void testCreateParentQueueAndParent() {
|
||||
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
|
||||
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
|
||||
FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
|
||||
FSQueueType.PARENT);
|
||||
|
||||
assertNotNull("Parent queue root.queue1 was not created",
|
||||
queueManager.getParentQueue("root.queue1", false));
|
||||
assertNotNull("Leaf queue root.queue1.queue2 was not created",
|
||||
queueManager.getParentQueue("root.queue1.queue2", false));
|
||||
assertEquals("createQueue() returned wrong queue",
|
||||
"root.queue1.queue2", q2.getName());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,6 +99,8 @@ The allocation file must be in XML format. The format contains five types of ele
|
|||
|
||||
* maxResources: maximum resources a queue is allowed, in the form "X mb, Y vcores". A queue will never be assigned a container that would put its aggregate usage over this limit.
|
||||
|
||||
* maxChildResources: maximum resources an ad hoc child queue is allowed, in the form "X mb, Y vcores". Any ad hoc queue that is a direct child of a queue with this property set will have it's maxResources property set accordingly.
|
||||
|
||||
* maxRunningApps: limit the number of apps from the queue to run at once
|
||||
|
||||
* maxAMShare: limit the fraction of the queue's fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f.
|
||||
|
@ -176,6 +178,7 @@ The allocation file must be in XML format. The format contains five types of ele
|
|||
user queues under it -->
|
||||
<queue name="secondary_group_queue" type="parent">
|
||||
<weight>3.0</weight>
|
||||
<maxChildResources>4096 mb,4vcores</maxChildResources>
|
||||
</queue>
|
||||
|
||||
<user name="sample_user">
|
||||
|
|
Loading…
Reference in New Issue