diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index f984fef0b10..90d7d9885b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -46,6 +46,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { // Maximum amount of resources per queue @VisibleForTesting final Map maxQueueResources; + // Maximum amount of resources for each queue's ad hoc children + private final Map maxChildQueueResources; // Sharing weights for each queue private final Map queueWeights; @@ -107,6 +109,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, + Map maxChildQueueResources, Map queueMaxApps, Map userMaxApps, Map queueWeights, Map queueMaxAMShares, int userMaxAppsDefault, @@ -126,6 +129,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { Set nonPreemptableQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; + this.maxChildQueueResources = maxChildQueueResources; this.queueMaxApps = queueMaxApps; this.userMaxApps = userMaxApps; this.queueMaxAMShares = queueMaxAMShares; @@ -149,31 +153,32 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { } public AllocationConfiguration(Configuration conf) { - minQueueResources = new HashMap(); - maxQueueResources = new HashMap(); - queueWeights = new HashMap(); - queueMaxApps = new HashMap(); - userMaxApps = new HashMap(); - queueMaxAMShares = new HashMap(); + minQueueResources = new HashMap<>(); + maxChildQueueResources = new HashMap<>(); + maxQueueResources = new HashMap<>(); + queueWeights = new HashMap<>(); + queueMaxApps = new HashMap<>(); + userMaxApps = new HashMap<>(); + queueMaxAMShares = new HashMap<>(); userMaxAppsDefault = Integer.MAX_VALUE; queueMaxAppsDefault = Integer.MAX_VALUE; queueMaxResourcesDefault = Resources.unbounded(); queueMaxAMShareDefault = 0.5f; - queueAcls = new HashMap>(); - resAcls = new HashMap>(); - minSharePreemptionTimeouts = new HashMap(); - fairSharePreemptionTimeouts = new HashMap(); - fairSharePreemptionThresholds = new HashMap(); - schedulingPolicies = new HashMap(); + queueAcls = new HashMap<>(); + resAcls = new HashMap<>(); + minSharePreemptionTimeouts = new HashMap<>(); + fairSharePreemptionTimeouts = new HashMap<>(); + fairSharePreemptionThresholds = new HashMap<>(); + schedulingPolicies = new HashMap<>(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; reservableQueues = new HashSet<>(); - configuredQueues = new HashMap>(); + configuredQueues = new HashMap<>(); for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet()); } - placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - configuredQueues); - nonPreemptableQueues = new HashSet(); + placementPolicy = + QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); + nonPreemptableQueues = new HashSet<>(); } /** @@ -263,7 +268,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { /** * Get the minimum resource allocation for the given queue. - * @return the cap set on this queue, or 0 if not set. + * + * @param queue the target queue's name + * @return the min allocation on this queue or {@link Resources#none} + * if not set */ public Resource getMinResources(String queue) { Resource minQueueResource = minQueueResources.get(queue); @@ -271,14 +279,26 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { } /** - * Get the maximum resource allocation for the given queue. - * @return the cap set on this queue, or Integer.MAX_VALUE if not set. + * Set the maximum resource allocation for the given queue. + * + * @param queue the target queue + * @param maxResource the maximum resource allocation */ + void setMaxResources(String queue, Resource maxResource) { + maxQueueResources.put(queue, maxResource); + } - public Resource getMaxResources(String queueName) { - Resource maxQueueResource = maxQueueResources.get(queueName); + /** + * Get the maximum resource allocation for the given queue. If the max in not + * set, return the larger of the min and the default max. + * + * @param queue the target queue's name + * @return the max allocation on this queue + */ + public Resource getMaxResources(String queue) { + Resource maxQueueResource = maxQueueResources.get(queue); if (maxQueueResource == null) { - Resource minQueueResource = minQueueResources.get(queueName); + Resource minQueueResource = minQueueResources.get(queue); if (minQueueResource != null && Resources.greaterThan(RESOURCE_CALCULATOR, Resources.unbounded(), minQueueResource, queueMaxResourcesDefault)) { @@ -291,6 +311,27 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { } } + /** + * Get the maximum resource allocation for children of the given queue. + * + * @param queue the target queue's name + * @return the max allocation on this queue or null if not set + */ + public Resource getMaxChildResources(String queue) { + return maxChildQueueResources.get(queue); + } + + /** + * Set the maximum resource allocation for the children of the given queue. + * Use of this method is primarily intended for testing purposes. + * + * @param queue the target queue + * @param maxResource the maximum resource allocation + */ + void setMaxChildResources(String queue, Resource maxResource) { + maxChildQueueResources.put(queue, maxResource); + } + public boolean hasAccess(String queueName, QueueACL acl, UserGroupInformation user) { int lastPeriodIndex = queueName.length(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index fab536d2a51..ee7198122c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -213,23 +213,22 @@ public class AllocationFileLoaderService extends AbstractService { LOG.info("Loading allocation file " + allocFile); // Create some temporary hashmaps to hold the new allocs, and we only save // them in our fields if we have parsed the entire allocs file successfully. - Map minQueueResources = new HashMap(); - Map maxQueueResources = new HashMap(); - Map queueMaxApps = new HashMap(); - Map userMaxApps = new HashMap(); - Map queueMaxAMShares = new HashMap(); - Map queueWeights = new HashMap(); - Map queuePolicies = new HashMap(); - Map minSharePreemptionTimeouts = new HashMap(); - Map fairSharePreemptionTimeouts = new HashMap(); - Map fairSharePreemptionThresholds = - new HashMap(); - Map> queueAcls = - new HashMap>(); + Map minQueueResources = new HashMap<>(); + Map maxQueueResources = new HashMap<>(); + Map maxChildQueueResources = new HashMap<>(); + Map queueMaxApps = new HashMap<>(); + Map userMaxApps = new HashMap<>(); + Map queueMaxAMShares = new HashMap<>(); + Map queueWeights = new HashMap<>(); + Map queuePolicies = new HashMap<>(); + Map minSharePreemptionTimeouts = new HashMap<>(); + Map fairSharePreemptionTimeouts = new HashMap<>(); + Map fairSharePreemptionThresholds = new HashMap<>(); + Map> queueAcls = new HashMap<>(); Map> reservationAcls = - new HashMap>(); - Set reservableQueues = new HashSet(); - Set nonPreemptableQueues = new HashSet(); + new HashMap<>(); + Set reservableQueues = new HashSet<>(); + Set nonPreemptableQueues = new HashSet<>(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -250,8 +249,8 @@ public class AllocationFileLoaderService extends AbstractService { // configuredQueues is segregated based on whether it is a leaf queue // or a parent queue. This information is used for creating queues // and also for making queue placement decisions(QueuePlacementRule.java). - Map> configuredQueues = - new HashMap>(); + Map> configuredQueues = new HashMap<>(); + for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet()); } @@ -368,10 +367,11 @@ public class AllocationFileLoaderService extends AbstractService { parent = null; } loadQueue(parent, element, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - fairSharePreemptionThresholds, queueAcls, reservationAcls, - configuredQueues, reservableQueues, nonPreemptableQueues); + maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, + queueWeights, queuePolicies, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, + reservationAcls, configuredQueues, reservableQueues, + nonPreemptableQueues); } // Load placement policy and pass it configured queues @@ -413,14 +413,15 @@ public class AllocationFileLoaderService extends AbstractService { globalReservationQueueConfig.setReservationAgent(reservationAgent); } - AllocationConfiguration info = new AllocationConfiguration(minQueueResources, - maxQueueResources, queueMaxApps, userMaxApps, queueWeights, - queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, - queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies, - defaultSchedPolicy, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + AllocationConfiguration info = + new AllocationConfiguration(minQueueResources, maxQueueResources, + maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights, + queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, + queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies, + defaultSchedPolicy, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, + reservationAcls, newPlacementPolicy, configuredQueues, + globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -433,8 +434,11 @@ public class AllocationFileLoaderService extends AbstractService { */ private void loadQueue(String parentName, Element element, Map minQueueResources, - Map maxQueueResources, Map queueMaxApps, - Map userMaxApps, Map queueMaxAMShares, + Map maxQueueResources, + Map maxChildQueueResources, + Map queueMaxApps, + Map userMaxApps, + Map queueMaxAMShares, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, @@ -463,8 +467,8 @@ public class AllocationFileLoaderService extends AbstractService { if (parentName != null) { queueName = parentName + "." + queueName; } - Map acls = - new HashMap(); + + Map acls = new HashMap<>(); Map racls = new HashMap<>(); NodeList fields = element.getChildNodes(); boolean isLeaf = true; @@ -476,12 +480,19 @@ public class AllocationFileLoaderService extends AbstractService { Element field = (Element) fieldNode; if ("minResources".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); minQueueResources.put(queueName, val); } else if ("maxResources".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); maxQueueResources.put(queueName, val); + } else if ("maxChildResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + Resource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + maxChildQueueResources.put(queueName, val); } else if ("maxRunningApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -543,8 +554,8 @@ public class AllocationFileLoaderService extends AbstractService { } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, - queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, + maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, + queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, resAcls, configuredQueues, reservableQueues, nonPreemptableQueues); @@ -574,9 +585,8 @@ public class AllocationFileLoaderService extends AbstractService { && !Resources.fitsIn(minQueueResources.get(queueName), maxQueueResources.get(queueName))) { LOG.warn( - String.format( - "Queue %s has max resources %s less than min resources %s", - queueName, maxQueueResources.get(queueName), + String.format("Queue %s has max resources %s less than " + + "min resources %s", queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ac384a11048..73d56d724f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.EnumSet; import java.util.HashSet; @@ -1573,11 +1574,41 @@ public class FairScheduler extends allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource()); queueMgr.updateAllocationConfiguration(allocConf); + applyChildDefaults(); maxRunningEnforcer.updateRunnabilityOnReload(); } } } + /** + * After reloading the allocation config, the max resource settings for any + * ad hoc queues will be missing. This method goes through the queue manager's + * queue list and adds back the max resources settings for any ad hoc queues. + * Note that the new max resource settings will be based on the new config. + * The old settings are lost. + */ + private void applyChildDefaults() { + Collection queues = queueMgr.getQueues(); + Set configuredLeafQueues = + allocConf.getConfiguredQueues().get(FSQueueType.LEAF); + Set configuredParentQueues = + allocConf.getConfiguredQueues().get(FSQueueType.PARENT); + + for (FSQueue queue : queues) { + // If the queue is ad hoc and not root, apply the child defaults + if ((queue.getParent() != null) && + !configuredLeafQueues.contains(queue.getName()) && + !configuredParentQueues.contains(queue.getName())) { + Resource max = + allocConf.getMaxChildResources(queue.getParent().getName()); + + if (max != null) { + allocConf.setMaxResources(queue.getName(), max); + } + } + } + } + @Override public List getAppsInQueue(String queueName) { FSQueue queue = queueMgr.getQueue(queueName); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 8d308dc9baa..964c72bb3da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -39,6 +39,9 @@ import org.xml.sax.SAXException; import com.google.common.base.CharMatcher; import com.google.common.annotations.VisibleForTesting; +import java.util.Iterator; +import java.util.Set; +import org.apache.hadoop.yarn.api.records.Resource; /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. @@ -173,15 +176,42 @@ public class QueueManager { } /** - * Creates a leaf or parent queue based on what is specified in 'queueType' - * and places it in the tree. Creates any parents that don't already exist. + * Create a leaf or parent queue based on what is specified in + * {@code queueType} and place it in the tree. Create any parents that don't + * already exist. * - * @return - * the created queue, if successful. null if not allowed (one of the parent - * queues in the queue name is already a leaf queue) + * @return the created queue, if successful or null if not allowed (one of the + * parent queues in the queue name is already a leaf queue) */ - private FSQueue createQueue(String name, FSQueueType queueType) { - List newQueueNames = new ArrayList(); + @VisibleForTesting + FSQueue createQueue(String name, FSQueueType queueType) { + List newQueueNames = new ArrayList<>(); + FSParentQueue parent = buildNewQueueList(name, newQueueNames); + FSQueue queue = null; + + if (parent != null) { + // Now that we know everything worked out, make all the queues + // and add them to the map. + queue = createNewQueues(queueType, parent, newQueueNames); + } + + return queue; + } + + /** + * Compile a list of all parent queues of the given queue name that do not + * already exist. The queue names will be added to the {@code newQueueNames} + * list. The list will be in order of increasing queue depth. The first + * element of the list will be the parent closest to the root. The last + * element added will be the queue to be created. This method returns the + * deepest parent that does exist. + * + * @param name the fully qualified name of the queue to create + * @param newQueueNames the list to which to add non-existent queues + * @return the deepest existing parent queue + */ + private FSParentQueue buildNewQueueList(String name, + List newQueueNames) { newQueueNames.add(name); int sepIndex = name.length(); FSParentQueue parent = null; @@ -195,62 +225,120 @@ public class QueueManager { throw new InvalidQueueNameException("Illegal node name at offset " + (sepIndex+1) + " for queue name " + name); } - FSQueue queue; - String curName = null; - curName = name.substring(0, sepIndex); - queue = queues.get(curName); + + String curName = name.substring(0, sepIndex); + FSQueue queue = queues.get(curName); if (queue == null) { - newQueueNames.add(curName); + newQueueNames.add(0, curName); } else { if (queue instanceof FSParentQueue) { parent = (FSParentQueue)queue; - break; - } else { - return null; } - } - } - - // At this point, parent refers to the deepest existing parent of the - // queue to create. - // Now that we know everything worked out, make all the queues - // and add them to the map. - AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); - FSLeafQueue leafQueue = null; - for (int i = newQueueNames.size()-1; i >= 0; i--) { - String queueName = newQueueNames.get(i); - if (i == 0 && queueType != FSQueueType.PARENT) { - leafQueue = new FSLeafQueue(name, scheduler, parent); - try { - leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); - } catch (AllocationConfigurationException ex) { - LOG.warn("Failed to set default scheduling policy " - + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex); - } - parent.addChildQueue(leafQueue); - queues.put(leafQueue.getName(), leafQueue); - leafQueues.add(leafQueue); - leafQueue.updatePreemptionVariables(); - return leafQueue; - } else { - FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); - try { - newParent.setPolicy(queueConf.getDefaultSchedulingPolicy()); - } catch (AllocationConfigurationException ex) { - LOG.warn("Failed to set default scheduling policy " - + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex); - } - parent.addChildQueue(newParent); - queues.put(newParent.getName(), newParent); - newParent.updatePreemptionVariables(); - parent = newParent; + + // If the queue isn't a parent queue, parent will still be null when + // we break + + break; } } return parent; } + /** + * Create all queues in the {@code newQueueNames} list. The list must be in + * order of increasing depth. All but the last element in the list will be + * created as parent queues. The last element will be created as the type + * specified by the {@code queueType} parameter. The first queue will be + * created as a child of the {@code topParent} queue. All subsequent queues + * will be created as a child of the previously created queue. + * + * @param queueType the type of the last queue to create + * @param topParent the parent of the first queue to create + * @param newQueueNames the list of queues to create + * @return the last queue created + */ + private FSQueue createNewQueues(FSQueueType queueType, + FSParentQueue topParent, List newQueueNames) { + AllocationConfiguration queueConf = scheduler.getAllocationConfiguration(); + Iterator i = newQueueNames.iterator(); + FSParentQueue parent = topParent; + FSQueue queue = null; + + while (i.hasNext()) { + FSParentQueue newParent = null; + String queueName = i.next(); + + // Only create a leaf queue at the very end + if (!i.hasNext() && (queueType != FSQueueType.PARENT)) { + FSLeafQueue leafQueue = new FSLeafQueue(queueName, scheduler, parent); + + try { + leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); + } catch (AllocationConfigurationException ex) { + LOG.warn("Failed to set default scheduling policy " + + queueConf.getDefaultSchedulingPolicy() + + " on new leaf queue.", ex); + } + + leafQueues.add(leafQueue); + queue = leafQueue; + } else { + newParent = new FSParentQueue(queueName, scheduler, parent); + + try { + newParent.setPolicy(queueConf.getDefaultSchedulingPolicy()); + } catch (AllocationConfigurationException ex) { + LOG.warn("Failed to set default scheduling policy " + + queueConf.getDefaultSchedulingPolicy() + + " on new parent queue.", ex); + } + + queue = newParent; + } + + parent.addChildQueue(queue); + setChildResourceLimits(parent, queue, queueConf); + queues.put(queue.getName(), queue); + queue.updatePreemptionVariables(); + + // If we just created a leaf node, the newParent is null, but that's OK + // because we only create a leaf node in the very last iteration. + parent = newParent; + } + + return queue; + } + + /** + * For the given child queue, set the max resources based on the + * parent queue's default child resource settings. This method assumes that + * the child queue is ad hoc and hence does not do any safety checks around + * overwriting existing max resource settings. + * + * @param parent the parent queue + * @param child the child queue + * @param queueConf the {@link AllocationConfiguration} + */ + void setChildResourceLimits(FSParentQueue parent, FSQueue child, + AllocationConfiguration queueConf) { + Map> configuredQueues = + queueConf.getConfiguredQueues(); + + // Ad hoc queues do not exist in the configured queues map + if (!configuredQueues.get(FSQueueType.LEAF).contains(child.getName()) && + !configuredQueues.get(FSQueueType.PARENT).contains(child.getName())) { + // For ad hoc queues, set their max reource allocations based on + // their parents' default child settings. + Resource maxChild = queueConf.getMaxChildResources(parent.getName()); + + if (maxChild != null) { + queueConf.setMaxResources(child.getName(), maxChild); + } + } + } + /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 11d49818aac..12c3fa9cc92 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -179,12 +179,16 @@ public class TestAllocationFileLoaderService { out.println(""); out.println("60"); out.println(""); - //Make queue F a parent queue without configured leaf queues using the 'type' attribute + // Make queue F a parent queue without configured leaf queues using the + // 'type' attribute out.println(""); + out.println("2048mb,64vcores"); out.println(""); // Create hierarchical queues G,H, with different min/fair share preemption - // timeouts and preemption thresholds + // timeouts and preemption thresholds. Also add a child default to make sure + // it doesn't impact queue H. out.println(""); + out.println("2048mb,64vcores"); out.println("120"); out.println("50"); out.println("0.6"); @@ -240,6 +244,12 @@ public class TestAllocationFileLoaderService { queueConf.getMaxResources("root.queueD")); assertEquals(Resources.createResource(4096, 100), queueConf.getMaxResources("root.queueE")); + assertEquals(Resources.createResource(4096, 100), + queueConf.getMaxResources("root.queueF")); + assertEquals(Resources.createResource(4096, 100), + queueConf.getMaxResources("root.queueG")); + assertEquals(Resources.createResource(4096, 100), + queueConf.getMaxResources("root.queueG.queueH")); assertEquals(Resources.createResource(1024, 0), queueConf.getMinResources("root.queueA")); @@ -251,8 +261,33 @@ public class TestAllocationFileLoaderService { queueConf.getMinResources("root.queueD")); assertEquals(Resources.createResource(0), queueConf.getMinResources("root.queueE")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueF")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueG")); + assertEquals(Resources.createResource(0), + queueConf.getMinResources("root.queueG.queueH")); - assertEquals(15, queueConf.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertNull("Max child resources unexpectedly set for queue root.queueA", + queueConf.getMaxChildResources("root.queueA")); + assertNull("Max child resources unexpectedly set for queue root.queueB", + queueConf.getMaxChildResources("root.queueB")); + assertNull("Max child resources unexpectedly set for queue root.queueC", + queueConf.getMaxChildResources("root.queueC")); + assertNull("Max child resources unexpectedly set for queue root.queueD", + queueConf.getMaxChildResources("root.queueD")); + assertNull("Max child resources unexpectedly set for queue root.queueE", + queueConf.getMaxChildResources("root.queueE")); + assertEquals(Resources.createResource(2048, 64), + queueConf.getMaxChildResources("root.queueF")); + assertEquals(Resources.createResource(2048, 64), + queueConf.getMaxChildResources("root.queueG")); + assertNull("Max child resources unexpectedly set for " + + "queue root.queueG.queueH", + queueConf.getMaxChildResources("root.queueG.queueH")); + + assertEquals(15, queueConf.getQueueMaxApps("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(15, queueConf.getQueueMaxApps("root.queueA")); assertEquals(15, queueConf.getQueueMaxApps("root.queueB")); assertEquals(15, queueConf.getQueueMaxApps("root.queueC")); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 06d5c3bcdf3..77398a2bc5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -139,8 +139,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { conf = createConfiguration(); resourceManager = new MockRM(conf); - // TODO: This test should really be using MockRM. For now starting stuff - // that is needed at a bare minimum. ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); resourceManager.getRMContext().getStateStore().start(); @@ -335,8 +333,14 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } + /** + * Test fair shares when max resources are set but are too high to impact + * the shares. + * + * @throws IOException if scheduler reinitialization fails + */ @Test - public void testFairShareWithMaxResources() throws IOException { + public void testFairShareWithHighMaxResources() throws IOException { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); // set queueA and queueB maxResources, // the sum of queueA and queueB maxResources is more than @@ -376,11 +380,184 @@ public class TestFairScheduler extends FairSchedulerTestBase { FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue( "queueA", false); // queueA's weight is 0.25, so its fair share should be 2 * 1024. - assertEquals(2 * 1024, queue.getFairShare().getMemorySize()); + assertEquals("Queue A did not get its expected fair share", + 2 * 1024, queue.getFairShare().getMemorySize()); // queueB's weight is 0.75, so its fair share should be 6 * 1024. queue = scheduler.getQueueManager().getLeafQueue( "queueB", false); - assertEquals(6 * 1024, queue.getFairShare().getMemorySize()); + assertEquals("Queue B did not get its expected fair share", + 6 * 1024, queue.getFairShare().getMemorySize()); + } + + /** + * Test fair shares when max resources are set and are low enough to impact + * the shares. + * + * @throws IOException if scheduler reinitialization fails + */ + @Test + public void testFairShareWithLowMaxResources() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println(""); + out.println(""); + out.println(" "); + out.println(" 1024 mb 1 vcores"); + out.println(" 0.75"); + out.println(" "); + out.println(" "); + out.println(" 3072 mb 3 vcores"); + out.println(" 0.25"); + out.println(" "); + out.println(""); + out.close(); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId1 = + createSchedulingRequest(1024, 1, "queueA", "user1", 2); + ApplicationAttemptId attId2 = + createSchedulingRequest(1024, 1, "queueB", "user1", 4); + + scheduler.update(); + + FSLeafQueue queue = + scheduler.getQueueManager().getLeafQueue("queueA", false); + // queueA's weight is 0.5, so its fair share should be 6GB, but it's + // capped at 1GB. + assertEquals("Queue A did not get its expected fair share", + 1 * 1024, queue.getFairShare().getMemorySize()); + // queueB's weight is 0.5, so its fair share should be 2GB, but the + // other queue is capped at 1GB, so queueB's share is 7GB, + // capped at 3GB. + queue = scheduler.getQueueManager().getLeafQueue( + "queueB", false); + assertEquals("Queue B did not get its expected fair share", + 3 * 1024, queue.getFairShare().getMemorySize()); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + scheduler.handle(updateEvent); + scheduler.handle(updateEvent); + scheduler.handle(updateEvent); + scheduler.handle(updateEvent); + scheduler.handle(updateEvent); + + // App 1 should be running with 1 container + assertEquals("App 1 is not running with the correct number of containers", + 1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + // App 2 should be running with 3 containers + assertEquals("App 2 is not running with the correct number of containers", + 3, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + } + + /** + * Test the child max resource settings. + * + * @throws IOException if scheduler reinitialization fails + */ + @Test + public void testChildMaxResources() throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println(""); + out.println(""); + out.println(" "); + out.println(" 2048mb,2vcores"); + out.println(" "); + out.println(""); + out.close(); + + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + "127.0.0.1"); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId1 = + createSchedulingRequest(1024, 1, "queueA.queueB", "user1", 8); + ApplicationAttemptId attId2 = + createSchedulingRequest(1024, 1, "queueA.queueC", "user1", 8); + + scheduler.update(); + + NodeUpdateSchedulerEvent nodeEvent = new NodeUpdateSchedulerEvent(node1); + + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + + // Apps should be running with 2 containers + assertEquals("App 1 is not running with the correct number of containers", + 2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals("App 2 is not running with the correct number of containers", + 2, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 3072mb,3vcores"); + out.println(" "); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + scheduler.handle(nodeEvent); + + // Apps should be running with 3 containers now + assertEquals("App 1 is not running with the correct number of containers", + 3, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals("App 2 is not running with the correct number of containers", + 3, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(" "); + out.println(" 1024mb,1vcores"); + out.println(" "); + out.println(""); + out.close(); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(nodeEvent); + + // Apps still should be running with 3 containers because we don't preempt + assertEquals("App 1 is not running with the correct number of containers", + 3, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + assertEquals("App 2 is not running with the correct number of containers", + 3, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index 33d44192528..a9c3788d467 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Set; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Before; import org.junit.Test; @@ -33,27 +34,41 @@ public class TestQueueManager { private FairSchedulerConfiguration conf; private QueueManager queueManager; private Set notEmptyQueues; + private FairScheduler scheduler; @Before public void setUp() throws Exception { conf = new FairSchedulerConfiguration(); - FairScheduler scheduler = mock(FairScheduler.class); + scheduler = mock(FairScheduler.class); + AllocationConfiguration allocConf = new AllocationConfiguration(conf); + + // Set up some queues to test default child max resource inheritance + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test"); + allocConf.setMaxChildResources("root.test", + Resources.createResource(8192, 256)); + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.test.childA"); + allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.test.childB"); + when(scheduler.getAllocationConfiguration()).thenReturn(allocConf); when(scheduler.getConf()).thenReturn(conf); + SystemClock clock = SystemClock.getInstance(); + when(scheduler.getClock()).thenReturn(clock); - notEmptyQueues = new HashSet(); + notEmptyQueues = new HashSet<>(); queueManager = new QueueManager(scheduler) { @Override public boolean isEmpty(FSQueue queue) { return !notEmptyQueues.contains(queue); } }; + FSQueueMetrics.forQueue("root", null, true, conf); + queueManager.initialize(conf); } - + @Test public void testReloadTurnsLeafQueueIntoParent() throws Exception { updateConfiguredLeafQueues(queueManager, "queue1"); @@ -143,4 +158,149 @@ public class TestQueueManager { allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues)); queueMgr.updateAllocationConfiguration(allocConf); } + + /** + * Test simple leaf queue creation. + */ + @Test + public void testCreateLeafQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF); + + assertNotNull("Leaf queue root.queue1 was not created", + queueManager.getLeafQueue("root.queue1", false)); + assertEquals("createQueue() returned wrong queue", + "root.queue1", q1.getName()); + } + + /** + * Test creation of a leaf queue and its parent. + */ + @Test + public void testCreateLeafQueueAndParent() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q2 = queueManager.createQueue("root.queue1.queue2", + FSQueueType.LEAF); + + assertNotNull("Parent queue root.queue1 was not created", + queueManager.getParentQueue("root.queue1", false)); + assertNotNull("Leaf queue root.queue1.queue2 was not created", + queueManager.getLeafQueue("root.queue1.queue2", false)); + assertEquals("createQueue() returned wrong queue", + "root.queue1.queue2", q2.getName()); + } + + /** + * Test creation of leaf and parent child queues when the parent queue has + * child defaults set. In this test we rely on the root.test, + * root.test.childA and root.test.childB queues that are created in the + * {@link #setUp()} method. + */ + @Test + public void testCreateQueueWithChildDefaults() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.createQueue("root.test.childC", FSQueueType.LEAF); + + assertNotNull("Leaf queue root.test.childC was not created", + queueManager.getLeafQueue("root.test.childC", false)); + assertEquals("createQueue() returned wrong queue", + "root.test.childC", q1.getName()); + assertEquals("Max resources for root.queue1 were not inherited from " + + "parent's max child resources", Resources.createResource(8192, 256), + allocConf.getMaxResources("root.test.childC")); + + FSQueue q2 = queueManager.createQueue("root.test.childD", + FSQueueType.PARENT); + + assertNotNull("Leaf queue root.test.childD was not created", + queueManager.getParentQueue("root.test.childD", false)); + assertEquals("createQueue() returned wrong queue", + "root.test.childD", q2.getName()); + assertEquals("Max resources for root.test.childD were not inherited " + + "from parent's max child resources", + Resources.createResource(8192, 256), + allocConf.getMaxResources("root.test.childD")); + + // Check that the childA and childB queues weren't impacted + // by the child defaults + assertNotNull("Leaf queue root.test.childA was not created during setup", + queueManager.getLeafQueue("root.test.childA", false)); + assertEquals("Max resources for root.test.childA were inherited from " + + "parent's max child resources", Resources.unbounded(), + allocConf.getMaxResources("root.test.childA")); + assertNotNull("Leaf queue root.test.childB was not created during setup", + queueManager.getParentQueue("root.test.childB", false)); + assertEquals("Max resources for root.test.childB were inherited from " + + "parent's max child resources", Resources.unbounded(), + allocConf.getMaxResources("root.test.childB")); + } + + /** + * Test creation of a leaf queue with no resource limits. + */ + @Test + public void testCreateLeafQueueWithDefaults() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF); + + assertNotNull("Leaf queue root.queue1 was not created", + queueManager.getLeafQueue("root.queue1", false)); + assertEquals("createQueue() returned wrong queue", + "root.queue1", q1.getName()); + + // Min default is 0,0 + assertEquals("Min resources were not set to default", + Resources.createResource(0, 0), + allocConf.getMinResources("root.queue1")); + + // Max default is unbounded + assertEquals("Max resources were not set to default", Resources.unbounded(), + allocConf.getMaxResources("root.queue1")); + } + + /** + * Test creation of a simple parent queue. + */ + @Test + public void testCreateParentQueue() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.PARENT); + + assertNotNull("Parent queue root.queue1 was not created", + queueManager.getParentQueue("root.queue1", false)); + assertEquals("createQueue() returned wrong queue", + "root.queue1", q1.getName()); + } + + /** + * Test creation of a parent queue and its parent. + */ + @Test + public void testCreateParentQueueAndParent() { + AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); + + queueManager.updateAllocationConfiguration(allocConf); + + FSQueue q2 = queueManager.createQueue("root.queue1.queue2", + FSQueueType.PARENT); + + assertNotNull("Parent queue root.queue1 was not created", + queueManager.getParentQueue("root.queue1", false)); + assertNotNull("Leaf queue root.queue1.queue2 was not created", + queueManager.getParentQueue("root.queue1.queue2", false)); + assertEquals("createQueue() returned wrong queue", + "root.queue1.queue2", q2.getName()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md index 097acf44ddf..3a15f1c5050 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/FairScheduler.md @@ -97,6 +97,8 @@ The allocation file must be in XML format. The format contains five types of ele * maxResources: maximum resources a queue is allowed, in the form "X mb, Y vcores". A queue will never be assigned a container that would put its aggregate usage over this limit. + * maxChildResources: maximum resources an ad hoc child queue is allowed, in the form "X mb, Y vcores". Any ad hoc queue that is a direct child of a queue with this property set will have it's maxResources property set accordingly. + * maxRunningApps: limit the number of apps from the queue to run at once * maxAMShare: limit the fraction of the queue's fair share that can be used to run application masters. This property can only be used for leaf queues. For example, if set to 1.0f, then AMs in the leaf queue can take up to 100% of both the memory and CPU fair share. The value of -1.0f will disable this feature and the amShare will not be checked. The default value is 0.5f. @@ -174,6 +176,7 @@ The allocation file must be in XML format. The format contains five types of ele user queues under it --> 3.0 + 4096 mb,4vcores