diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java index f2f92b81fbc..3ae9ac4c545 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java @@ -42,6 +42,10 @@ public class ApplicationPlacementContext { return queue; } + public void setQueue(String q) { + queue = q; + } + public String getParentQueue() { return parentQueue; } @@ -49,4 +53,13 @@ public class ApplicationPlacementContext { public boolean hasParentQueue() { return parentQueue != null; } + + public String getFullQueuePath() { + if (parentQueue != null) { + return parentQueue + "." + queue; + } else { + return queue; + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9e7b0d8f634..fd144f23cd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -151,6 +151,14 @@ public abstract class AbstractCSQueue implements CSQueue { private Map userWeights = new HashMap(); private int maxParallelApps; + // is it a dynamic queue? + private boolean dynamicQueue = false; + + // When this queue has application submit to? + // This property only applies to dynamic queue, + // and will be used to check when the queue need to be removed. + private long lastSubmittedTimestamp; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -172,7 +180,7 @@ public abstract class AbstractCSQueue implements CSQueue { this.metrics = old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics.forQueue(getQueuePath(), parent, - cs.getConfiguration().getEnableUserMetrics(), cs.getConf()); + cs.getConfiguration().getEnableUserMetrics(), configuration); this.csContext = cs; this.minimumAllocation = csContext.getMinimumResourceCapability(); @@ -192,6 +200,7 @@ public abstract class AbstractCSQueue implements CSQueue { writeLock = lock.writeLock(); } + @VisibleForTesting protected void setupConfigurableCapacities() { setupConfigurableCapacities(csContext.getConfiguration()); } @@ -345,11 +354,6 @@ public abstract class AbstractCSQueue implements CSQueue { return defaultLabelExpression; } - void setupQueueConfigs(Resource clusterResource) - throws IOException { - setupQueueConfigs(clusterResource, csContext.getConfiguration()); - } - protected void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration configuration) throws IOException { @@ -405,7 +409,7 @@ public abstract class AbstractCSQueue implements CSQueue { QueueState parentState = (parent == null) ? null : parent.getState(); initializeQueueState(previous, configuredState, parentState); - authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); + authorizer = YarnAuthorizationProvider.getInstance(configuration); this.acls = configuration.getAcls(getQueuePath()); @@ -437,7 +441,7 @@ public abstract class AbstractCSQueue implements CSQueue { } this.reservationsContinueLooking = - csContext.getConfiguration().getReservationContinueLook(); + configuration.getReservationContinueLook(); this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this, configuration); @@ -1609,4 +1613,38 @@ public abstract class AbstractCSQueue implements CSQueue { } } } + + public boolean isDynamicQueue() { + readLock.lock(); + + try { + return dynamicQueue; + } finally { + readLock.unlock(); + } + } + + public void setDynamicQueue(boolean dynamicQueue) { + writeLock.lock(); + + try { + this.dynamicQueue = dynamicQueue; + } finally { + writeLock.unlock(); + } + } + + public long getLastSubmittedTimestamp() { + return lastSubmittedTimestamp; + } + + // "Tab" the queue, so this queue won't be removed because of idle timeout. + public void signalToSubmitToQueue() { + writeLock.lock(); + try { + this.lastSubmittedTimestamp = System.currentTimeMillis(); + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index 7bdc311aea1..a9e82a6f067 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -58,7 +58,7 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { writeLock.lock(); try { // Set new configs - setupQueueConfigs(clusterResource); + setupQueueConfigs(clusterResource, csContext.getConfiguration()); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 3fc256b218a..a4034768387 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -40,7 +41,7 @@ public class CSQueueUtils { float capacity, float maximumCapacity) { if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) { throw new IllegalArgumentException( - "Illegal value of maximumCapacity " + maximumCapacity + + "Illegal value of maximumCapacity " + maximumCapacity + " used in call to setMaxCapacity for queue " + queuePath); } } @@ -61,11 +62,11 @@ public class CSQueueUtils { public static float computeAbsoluteMaximumCapacity( float maximumCapacity, CSQueue parent) { - float parentAbsMaxCapacity = + float parentAbsMaxCapacity = (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity(); return (parentAbsMaxCapacity * maximumCapacity); } - + public static void loadCapacitiesByLabelsFromConf(String queuePath, QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) { queueCapacities.clearConfigurableFields(); @@ -312,4 +313,15 @@ public class CSQueueUtils { } } } + + public static ApplicationPlacementContext extractQueuePath(String queuePath) { + int parentQueueNameEndIndex = queuePath.lastIndexOf("."); + if (parentQueueNameEndIndex > -1) { + String parent = queuePath.substring(0, parentQueueNameEndIndex).trim(); + String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim(); + return new ApplicationPlacementContext(leaf, parent); + } else{ + return new ApplicationPlacementContext(queuePath); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 86f3023a221..89c1cf752ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -230,6 +230,8 @@ public class CapacityScheduler extends private AppPriorityACLsManager appPriorityACLManager; private boolean multiNodePlacementEnabled; + private CapacitySchedulerAutoQueueHandler autoQueueHandler; + private static boolean printedVerboseLoggingForAsyncScheduling = false; /** @@ -340,6 +342,9 @@ public class CapacityScheduler extends this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); + this.autoQueueHandler = new CapacitySchedulerAutoQueueHandler( + this.queueManager, this.conf); + this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager(); this.activitiesManager = new ActivitiesManager(rmContext); @@ -924,7 +929,7 @@ public class CapacityScheduler extends private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId applicationId, String user, String queueName, ApplicationPlacementContext placementContext, - boolean isRecovery) { + boolean isRecovery) { CSQueue queue = getQueue(queueName); @@ -3329,44 +3334,6 @@ public class CapacityScheduler extends return null; } - private LeafQueue autoCreateLeafQueue( - ApplicationPlacementContext placementContext) - throws IOException, YarnException { - - AutoCreatedLeafQueue autoCreatedLeafQueue = null; - - String leafQueueName = placementContext.getQueue(); - String parentQueueName = placementContext.getParentQueue(); - - if (!StringUtils.isEmpty(parentQueueName)) { - CSQueue parentQueue = getQueue(parentQueueName); - - if (parentQueue != null && conf.isAutoCreateChildQueueEnabled( - parentQueue.getQueuePath())) { - - ManagedParentQueue autoCreateEnabledParentQueue = - (ManagedParentQueue) parentQueue; - autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, - autoCreateEnabledParentQueue); - - addQueue(autoCreatedLeafQueue); - - } else{ - throw new SchedulerDynamicEditException( - "Could not auto-create leaf queue for " + leafQueueName - + ". Queue mapping specifies an invalid parent queue " - + "which does not exist " - + parentQueueName); - } - } else{ - throw new SchedulerDynamicEditException( - "Could not auto-create leaf queue for " + leafQueueName - + ". Queue mapping does not specify" - + " which parent queue it needs to be created under."); - } - return autoCreatedLeafQueue; - } - @Override public void resetSchedulerMetrics() { CapacitySchedulerMetrics.destroy(); @@ -3403,4 +3370,43 @@ public class CapacityScheduler extends public void setQueueManager(CapacitySchedulerQueueManager qm) { this.queueManager = qm; } + + private LeafQueue autoCreateLeafQueue( + ApplicationPlacementContext placementContext) + throws IOException, YarnException { + String leafQueueName = placementContext.getQueue(); + String parentQueueName = placementContext.getParentQueue(); + + if (!StringUtils.isEmpty(parentQueueName)) { + CSQueue parentQueue = getQueue(parentQueueName); + + if (parentQueue == null) { + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping specifies an invalid parent queue " + + "which does not exist " + parentQueueName); + } + + if (parentQueue != null && + conf.isAutoCreateChildQueueEnabled(parentQueue.getQueuePath())) { + // Case 1: Handle ManagedParentQueue + AutoCreatedLeafQueue autoCreatedLeafQueue = null; + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, + autoCreateEnabledParentQueue); + + addQueue(autoCreatedLeafQueue); + return autoCreatedLeafQueue; + + } else { + return autoQueueHandler.autoCreateQueue(placementContext); + } + } + + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping does not specify" + + " which parent queue it needs to be created under."); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java new file mode 100644 index 00000000000..1730021b22f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerAutoQueueHandler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Manages the validation and the creation of a Capacity Scheduler + * queue at runtime. + */ +public class CapacitySchedulerAutoQueueHandler { + private final CapacitySchedulerQueueManager queueManager; + private final CapacitySchedulerConfiguration conf; + private static final int MAXIMUM_DEPTH_ALLOWED = 2; + + public CapacitySchedulerAutoQueueHandler( + CapacitySchedulerQueueManager queueManager, + CapacitySchedulerConfiguration conf) { + this.queueManager = queueManager; + this.conf = conf; + } + + /** + * Creates a LeafQueue and its upper hierarchy given a path. A parent is + * eligible for creation if either the placement context creation flags are + * set, or the auto queue creation is enabled for the first static parent in + * the hierarchy. + * + * @param queue the application placement information of the queue + * @return LeafQueue part of a given queue path + * @throws YarnException if the given path is not eligible to be auto created + */ + public LeafQueue autoCreateQueue(ApplicationPlacementContext queue) + throws YarnException { + ApplicationPlacementContext parentContext = + CSQueueUtils.extractQueuePath(queue.getParentQueue()); + List parentsToCreate = new ArrayList<>(); + + ApplicationPlacementContext queueCandidateContext = parentContext; + CSQueue existingQueueCandidate = getQueue(queueCandidateContext.getQueue()); + + while (existingQueueCandidate == null) { + parentsToCreate.add(queueCandidateContext); + queueCandidateContext = CSQueueUtils.extractQueuePath( + queueCandidateContext.getParentQueue()); + existingQueueCandidate = getQueue(queueCandidateContext.getQueue()); + } + + // Reverse the collection to to represent the hierarchy to be created + // from highest to lowest level + Collections.reverse(parentsToCreate); + + if (!(existingQueueCandidate instanceof ParentQueue)) { + throw new SchedulerDynamicEditException( + "Could not auto create hierarchy of " + + queue.getFullQueuePath() + ". Queue " + + existingQueueCandidate.getQueuePath() + + " is not a ParentQueue." + ); + } + ParentQueue existingParentQueue = (ParentQueue) existingQueueCandidate; + int depthLimit = extractDepthLimit(existingParentQueue); + // The number of levels to be created including the LeafQueue + // (which is last) + int levelsToCreate = parentsToCreate.size() + 1; + + if (depthLimit == 0) { + throw new SchedulerDynamicEditException("Auto creation of queue " + + queue.getFullQueuePath() + " is not enabled under parent " + + existingParentQueue.getQueuePath()); + } + + if (levelsToCreate > depthLimit) { + throw new SchedulerDynamicEditException( + "Could not auto create queue " + queue.getFullQueuePath() + + ". In order to create the desired queue hierarchy, " + + levelsToCreate + " levels of queues would need " + + "to be created, which is above the limit."); + } + + for (ApplicationPlacementContext current : parentsToCreate) { + existingParentQueue = existingParentQueue + .addDynamicParentQueue(current.getFullQueuePath()); + queueManager.addQueue(existingParentQueue.getQueuePath(), + existingParentQueue); + } + + LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue( + queue.getFullQueuePath()); + queueManager.addQueue(leafQueue.getQueuePath(), leafQueue); + + return leafQueue; + } + + private int extractDepthLimit(ParentQueue parentQueue) { + if (parentQueue.isEligibleForAutoQueueCreation()) { + return MAXIMUM_DEPTH_ALLOWED; + } else { + return 0; + } + } + + private CSQueue getQueue(String queue) { + return queue != null ? queueManager.getQueue(queue) : null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java index c3b4df4efdf..ef9f97aee18 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java @@ -106,6 +106,10 @@ public final class CapacitySchedulerConfigValidator { } } + private static boolean isDynamicQueue(CSQueue csQueue) { + return ((AbstractCSQueue)csQueue).isDynamicQueue(); + } + /** * Ensure all existing queues are present. Queues cannot be deleted if its not * in Stopped state, Queue's cannot be moved from one hierarchy to other also. @@ -144,10 +148,12 @@ public final class CapacitySchedulerConfigValidator { LOG.info("Deleting Queue " + queuePath + ", as it is not" + " present in the modified capacity configuration xml"); } else { - throw new IOException(oldQueue.getQueuePath() + " cannot be" - + " deleted from the capacity scheduler configuration, as the" - + " queue is not yet in stopped state. Current State : " - + oldQueue.getState()); + if (!isDynamicQueue(oldQueue)) { + throw new IOException(oldQueue.getQueuePath() + " cannot be" + + " deleted from the capacity scheduler configuration, as the" + + " queue is not yet in stopped state. Current State : " + + oldQueue.getState()); + } } } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { //Queue's cannot be moved from one hierarchy to other diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 9188cec0e14..abbc2d7875f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -526,7 +526,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur throwExceptionForUnexpectedWeight(weight, queue, label); return weight; } - + public float getNonLabeledQueueCapacity(String queue) { String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY); boolean absoluteResourceConfigured = (configuredCapacity != null) @@ -2008,6 +2008,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled"; + @Private + private static final String AUTO_QUEUE_CREATION_V2_PREFIX = + "auto-queue-creation-v2"; + + @Private + public static final String AUTO_QUEUE_CREATION_V2_ENABLED = + AUTO_QUEUE_CREATION_V2_PREFIX + ".enabled"; + + @Private + public static final boolean DEFAULT_AUTO_QUEUE_CREATION_ENABLED = false; + @Private public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = "leaf-queue-template"; @@ -2044,6 +2055,20 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur autoCreationEnabled); } + public void setAutoQueueCreationV2Enabled(String queuePath, + boolean autoQueueCreation) { + setBoolean( + getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED, + autoQueueCreation); + } + + public boolean isAutoQueueCreationV2Enabled(String queuePath) { + boolean isAutoQueueCreation = getBoolean( + getQueuePrefix(queuePath) + AUTO_QUEUE_CREATION_V2_ENABLED, + DEFAULT_AUTO_QUEUE_CREATION_ENABLED); + return isAutoQueueCreation; + } + /** * Get the auto created leaf queue's template configuration prefix * Leaf queue's template capacities are configured at the parent queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index a3d65710b9f..c5ce700eef5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -176,7 +178,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< throws IOException { // Parse new queues CSQueueStore newQueues = new CSQueueStore(); - CSQueue newRoot = parseQueue(this.csContext, newConf, null, + CSQueue newRoot = parseQueue(this.csContext, newConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP); // When failing over, if using configuration store, don't validate queue @@ -212,7 +214,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< * @param conf the CapacitySchedulerConfiguration * @param parent the parent queue * @param queueName the queue name - * @param queues all the queues + * @param newQueues all the queues * @param oldQueues the old queues * @param hook the queue hook * @return the CSQueue @@ -222,18 +224,28 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< CapacitySchedulerContext csContext, CapacitySchedulerConfiguration conf, CSQueue parent, String queueName, - CSQueueStore queues, + CSQueueStore newQueues, CSQueueStore oldQueues, QueueHook hook) throws IOException { CSQueue queue; String fullQueueName = (parent == null) ? queueName : (parent.getQueuePath() + "." + queueName); - String[] childQueueNames = conf.getQueues(fullQueueName); + String[] staticChildQueueNames = conf.getQueues(fullQueueName); + List childQueueNames = staticChildQueueNames != null ? + Arrays.asList(staticChildQueueNames) : Collections.emptyList(); + boolean isReservableQueue = conf.isReservable(fullQueueName); boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled( fullQueueName); - if (childQueueNames == null || childQueueNames.length == 0) { + boolean isDynamicParent = false; + + CSQueue oldQueue = oldQueues.get(fullQueueName); + if (oldQueue instanceof ParentQueue) { + isDynamicParent = ((ParentQueue) oldQueue).isDynamicQueue(); + } + + if (childQueueNames.size() == 0 && !isDynamicParent) { if (null == parent) { throw new IllegalStateException( "Queue configuration missing child queue names for " + queueName); @@ -258,7 +270,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< } childQueues.add(resQueue); ((PlanQueue) queue).setChildQueues(childQueues); - queues.add(resQueue); + newQueues.add(resQueue); } else if (isAutoCreateEnabled) { queue = new ManagedParentQueue(csContext, queueName, parent, @@ -291,14 +303,14 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< List childQueues = new ArrayList<>(); for (String childQueueName : childQueueNames) { CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); + newQueues, oldQueues, hook); childQueues.add(childQueue); } parentQueue.setChildQueues(childQueues); } - queues.add(queue); + newQueues.add(queue); LOG.info("Initialized queue: " + fullQueueName); return queue; @@ -320,11 +332,12 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager< } } - for (CSQueue queue: existingQueues.getQueues()) { - if (newQueues.get(queue.getQueuePath()) == null && !( + for (CSQueue queue : existingQueues.getQueues()) { + if (!((AbstractCSQueue) queue).isDynamicQueue() && newQueues.get( + queue.getQueuePath()) == null && !( queue instanceof AutoCreatedLeafQueue && conf .isAutoCreateChildQueueEnabled( - queue.getParent().getQueuePath()))) { + queue.getParent().getQueuePath()))) { existingQueues.remove(queue); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1e6f581918a..15c321fca0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -168,11 +168,6 @@ public class LeafQueue extends AbstractCSQueue { } - protected void setupQueueConfigs(Resource clusterResource) - throws IOException { - setupQueueConfigs(clusterResource, csContext.getConfiguration()); - } - @SuppressWarnings("checkstyle:nowhitespaceafter") protected void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration conf) throws @@ -529,6 +524,13 @@ public class LeafQueue extends AbstractCSQueue { writeLock.lock(); try { + // We skip reinitialize for dynamic queues, when this is called, and + // new queue is different from this queue, we will make this queue to be + // static queue. + if (newlyParsedQueue != this) { + this.setDynamicQueue(false); + } + // Sanity check if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -552,11 +554,6 @@ public class LeafQueue extends AbstractCSQueue { } setupQueueConfigs(clusterResource, configuration); - - // queue metrics are updated, more resource may be available - // activate the pending applications if possible - activateApplications(); - } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index fc848c68473..0a2f0820070 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -31,6 +31,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -108,11 +109,18 @@ public class ParentQueue extends AbstractCSQueue { public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { + this(cs, cs.getConfiguration(), queueName, parent, old); + } + + private ParentQueue(CapacitySchedulerContext cs, + CapacitySchedulerConfiguration csConf, String queueName, CSQueue parent, + CSQueue old) + throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; this.rootQueue = (parent == null); - float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath()); + float rawCapacity = csConf.getNonLabeledQueueCapacity(getQueuePath()); if (rootQueue && (rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) { @@ -125,7 +133,7 @@ public class ParentQueue extends AbstractCSQueue { this.allowZeroCapacitySum = cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath()); - setupQueueConfigs(cs.getClusterResource()); + setupQueueConfigs(cs.getClusterResource(), csConf); LOG.info("Initialized parent-queue " + queueName + " name=" + queueName + @@ -139,11 +147,12 @@ public class ParentQueue extends AbstractCSQueue { queueOrderingPolicy.getConfigName(); } - protected void setupQueueConfigs(Resource clusterResource) + protected void setupQueueConfigs(Resource clusterResource, + CapacitySchedulerConfiguration csConf) throws IOException { writeLock.lock(); try { - super.setupQueueConfigs(clusterResource); + super.setupQueueConfigs(clusterResource, csConf); StringBuilder aclsString = new StringBuilder(); for (Map.Entry e : acls.entrySet()) { aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); @@ -158,7 +167,7 @@ public class ParentQueue extends AbstractCSQueue { } // Initialize queue ordering policy - queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy( + queueOrderingPolicy = csConf.getQueueOrderingPolicy( getQueuePath(), parent == null ? null : ((ParentQueue) parent).getQueueOrderingPolicyConfigName()); @@ -247,14 +256,11 @@ public class ParentQueue extends AbstractCSQueue { + "double check, details:" + diagMsg.toString()); } - if (weightIsSet) { + if (weightIsSet || queues.isEmpty()) { return QueueCapacityType.WEIGHT; } else if (absoluteMinResSet) { return QueueCapacityType.ABSOLUTE_RESOURCE; - } else if (percentageIsSet) { - return QueueCapacityType.PERCENT; } else { - // When all values equals to 0, consider it is a percent mode. return QueueCapacityType.PERCENT; } } @@ -464,12 +470,132 @@ public class ParentQueue extends AbstractCSQueue { "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); } + + private CapacitySchedulerConfiguration getConfForAutoCreatedQueue( + String childQueuePath, boolean isLeaf) { + // Copy existing config + CapacitySchedulerConfiguration dupCSConfig = + new CapacitySchedulerConfiguration( + csContext.getConfiguration(), false); + if (isLeaf) { + // FIXME: Ideally we should disable user limit factor, see YARN-10531 + // dupCSConfig.setUserLimitFactor(childQueuePath, ); + + // Set Max AM percentage to a higher value + dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent( + childQueuePath, 0.5f); + } + + return dupCSConfig; + } + + private CSQueue createNewQueue(String childQueuePath, boolean isLeaf) + throws SchedulerDynamicEditException { + try { + AbstractCSQueue childQueue; + String queueShortName = childQueuePath.substring( + childQueuePath.lastIndexOf(".") + 1); + + if (isLeaf) { + childQueue = new LeafQueue(csContext, + getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName, + this, null); + } else{ + childQueue = new ParentQueue(csContext, + getConfForAutoCreatedQueue(childQueuePath, isLeaf), queueShortName, + this, null); + } + childQueue.setDynamicQueue(true); + // It should be sufficient now, we don't need to set more, because weights + // related setup will be handled in updateClusterResources + + return childQueue; + } catch (IOException e) { + throw new SchedulerDynamicEditException(e.toString()); + } + } + + public ParentQueue addDynamicParentQueue(String queuePath) + throws SchedulerDynamicEditException { + return (ParentQueue) addDynamicChildQueue(queuePath, false); + } + + public LeafQueue addDynamicLeafQueue(String queuePath) + throws SchedulerDynamicEditException { + return (LeafQueue) addDynamicChildQueue(queuePath, true); + } + + // New method to add child queue + private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf) + throws SchedulerDynamicEditException { + writeLock.lock(); + try { + // Check if queue exists, if queue exists, write a warning message (this + // should not happen, since it will be handled before calling this method) + // , but we will move on. + CSQueue queue = + csContext.getCapacitySchedulerQueueManager().getQueueByFullName( + childQueuePath); + if (queue != null) { + LOG.warn( + "This should not happen, trying to create queue=" + childQueuePath + + ", however the queue already exists"); + return queue; + } + + // First, check if we allow creation or not + boolean weightsAreUsed = false; + try { + weightsAreUsed = getCapacityConfigurationTypeForQueues(childQueues) + == QueueCapacityType.WEIGHT; + } catch (IOException e) { + LOG.warn("Caught Exception during auto queue creation", e); + } + if (!weightsAreUsed) { + throw new SchedulerDynamicEditException( + "Trying to create new queue=" + childQueuePath + + " but not all the queues under parent=" + this.getQueuePath() + + " are using weight-based capacity. Failed to created queue"); + } + + CSQueue newQueue = createNewQueue(childQueuePath, isLeaf); + this.childQueues.add(newQueue); + + // Call updateClusterResource + // , which will deal with all effectiveMin/MaxResource + // Calculation + this.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(this.csContext.getClusterResource())); + + return newQueue; + } finally { + writeLock.unlock(); + } + } + + /** + * Check whether this queue supports adding additional child queues + * dynamically. + * @return true, if queue is eligible to create additional queues dynamically, + * false otherwise + */ + public boolean isEligibleForAutoQueueCreation() { + return isDynamicQueue() || csContext.getConfiguration(). + isAutoQueueCreationV2Enabled(getQueuePath()); + } @Override public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { writeLock.lock(); try { + // We skip reinitialize for dynamic queues, when this is called, and + // new queue is different from this queue, we will make this queue to be + // static queue. + if (newlyParsedQueue != this) { + this.setDynamicQueue(false); + } + // Sanity check if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { @@ -481,7 +607,7 @@ public class ParentQueue extends AbstractCSQueue { ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue; // Set new configs - setupQueueConfigs(clusterResource); + setupQueueConfigs(clusterResource, csContext.getConfiguration()); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! @@ -537,6 +663,10 @@ public class ParentQueue extends AbstractCSQueue { Map.Entry e = itr.next(); String queueName = e.getKey(); if (!newChildQueues.containsKey(queueName)) { + if (((AbstractCSQueue)e.getValue()).isDynamicQueue()) { + // Don't remove dynamic queue if we cannot find it in the config. + continue; + } itr.remove(); } } @@ -1045,11 +1175,26 @@ public class ParentQueue extends AbstractCSQueue { // below calculation for effective capacities updateAbsoluteCapacities(); + // Normalize all dynamic queue queue's weight to 1 for all accessible node + // labels, this is important because existing node labels could keep + // changing when new node added, or node label mapping changed. We need + // this to ensure auto created queue can access all labels. + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + for (CSQueue queue : childQueues) { + // For dynamic queue, we will set weight to 1 every time, because it + // is possible new labels added to the parent. + if (((AbstractCSQueue) queue).isDynamicQueue()) { + queue.getQueueCapacities().setWeight(nodeLabel, 1f); + } + } + } + // Normalize weight of children if (getCapacityConfigurationTypeForQueues(childQueues) == QueueCapacityType.WEIGHT) { for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { float sumOfWeight = 0; + for (CSQueue queue : childQueues) { float weight = Math.max(0, queue.getQueueCapacities().getWeight(nodeLabel)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 79afcdc2aaf..4dd3317e3eb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -46,6 +46,7 @@ public class PlanQueue extends AbstractManagedParentQueue { public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); + updateAbsoluteCapacities(); this.schedulerContext = cs; // Set the reservation queue attributes for the Plan @@ -100,7 +101,7 @@ public class PlanQueue extends AbstractManagedParentQueue { } // Set new configs - setupQueueConfigs(clusterResource); + setupQueueConfigs(clusterResource, csContext.getConfiguration()); updateQuotas(newlyParsedParentQueue.userLimit, newlyParsedParentQueue.userLimitFactor, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index 46bb0caed3a..86d35d6cdd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -82,7 +82,7 @@ public class QueueCapacities { .append("reserved_cap=" + capacitiesArr[7] + "%, ") .append("abs_reserved_cap=" + capacitiesArr[8] + "%, ") .append("weight=" + capacitiesArr[9] + "w, ") - .append("normalized_weight=" + capacitiesArr[9] + "w}"); + .append("normalized_weight=" + capacitiesArr[10] + "w}"); return sb.toString(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 1ef3a29dbba..300993b9475 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -907,7 +907,12 @@ public class TestCapacitySchedulerAutoQueueCreation @Test public void testDynamicAutoQueueCreationWithTags() throws Exception { - MockRM rm = null; + // This test we will reinitialize mockRM, so stop the previous initialized + // mockRM to avoid issues like MetricsSystem + if (mockRM != null) { + mockRM.stop(); + } + mockRM = null; try { CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); @@ -929,35 +934,35 @@ public class TestCapacitySchedulerAutoQueueCreation RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); mgr.init(csConf); - rm = new MockRM(csConf) { + mockRM = new MockRM(csConf) { @Override public RMNodeLabelsManager createNodeLabelManager() { return mgr; } }; - rm.start(); - MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB); + mockRM.start(); + MockNM nm = mockRM.registerNode("127.0.0.1:1234", 16 * GB); MockRMAppSubmissionData data = - MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + MockRMAppSubmissionData.Builder.createWithMemory(GB, mockRM) .withAppName("apptodynamicqueue") .withUser("hadoop") .withAcls(null) .withUnmanagedAM(false) .withApplicationTags(Sets.newHashSet("userid=testuser")) .build(); - RMApp app = MockRMAppSubmitter.submit(rm, data); - MockRM.launchAndRegisterAM(app, rm, nm); + RMApp app = MockRMAppSubmitter.submit(mockRM, data); + MockRM.launchAndRegisterAM(app, mockRM, nm); nm.nodeHeartbeat(true); - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); CSQueue queue = cs.getQueue("root.a.testuser"); assertNotNull("Leaf queue has not been auto-created", queue); assertEquals("Number of running applications", 1, queue.getNumApplications()); } finally { - if (rm != null) { - rm.close(); + if (mockRM != null) { + mockRM.close(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java new file mode 100644 index 00000000000..25b2f4d0c4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestCapacitySchedulerNewQueueAutoCreation + extends TestCapacitySchedulerAutoCreatedQueueBase { + private static final Logger LOG = LoggerFactory.getLogger( + org.apache.hadoop.yarn.server.resourcemanager + .scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.class); + public static final int GB = 1024; + private static final int MAX_MEMORY = 1200; + private MockRM mockRM = null; + private CapacityScheduler cs; + private CapacitySchedulerConfiguration csConf; + private CapacitySchedulerAutoQueueHandler autoQueueHandler; + + /* + Create the following structure: + root + / \ + a b + / + a1 + */ + @Before + public void setUp() throws Exception { + csConf = new CapacitySchedulerConfiguration(); + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + // By default, set 3 queues, a/b, and a.a1 + csConf.setQueues("root", new String[]{"a", "b"}); + csConf.setNonLabeledQueueWeight("root", 1f); + csConf.setNonLabeledQueueWeight("root.a", 1f); + csConf.setNonLabeledQueueWeight("root.b", 1f); + csConf.setQueues("root.a", new String[]{"a1"}); + csConf.setNonLabeledQueueWeight("root.a.a1", 1f); + csConf.setAutoQueueCreationV2Enabled("root", true); + csConf.setAutoQueueCreationV2Enabled("root.a", true); + csConf.setAutoQueueCreationV2Enabled("root.e", true); + } + + private void startScheduler() throws Exception { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(csConf); + mockRM = new MockRM(csConf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + cs.start(); + autoQueueHandler = new CapacitySchedulerAutoQueueHandler( + cs.getCapacitySchedulerQueueManager(), csConf); + mockRM.registerNode("h1:1234", MAX_MEMORY * GB); // label = x + } + + /* + Create and validate the following structure: + + root + ┌─────┬────────┬─────┴─────┬─────────┐ + a b c-auto e-auto d-auto + | | + a1 e1-auto + */ + private void createBasicQueueStructureAndValidate() throws Exception { + // queue's weights are 1 + // root + // - a (w=1) + // - b (w=1) + // - c-auto (w=1) + // - d-auto (w=1) + // - e-auto (w=1) + // - e1-auto (w=1) + MockNM nm1 = mockRM.registerNode("h1:1234", 1200 * GB); // label = x + + createQueue("root.c-auto"); + + // Check if queue c-auto got created + CSQueue c = cs.getQueue("root.c-auto"); + Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(400 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Now add another queue-d, in the same hierarchy + createQueue("root.d-auto"); + + // Because queue-d has the same weight of other sibling queue, its abs cap + // become 1/4 + CSQueue d = cs.getQueue("root.d-auto"); + Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(300 * GB, + d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Now we check queue c again, it should also become 1/4 capacity + Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(300 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Now we add a two-level queue, create leaf only + // Now add another queue a2-auto, under root.a + createQueue("root.a.a2-auto"); + + // root.a has 1/4 abs resource, a2/a1 has the same weight, so a2 has 1/8 abs + // capacity + CSQueue a2 = cs.getQueue("root.a.a2-auto"); + Assert.assertEquals(1 / 8f, a2.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(150 * GB, + a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // try, create leaf + parent, will success + createQueue("root.e-auto.e1-auto"); + + // Now check capacity of e and e1 (under root we have 5 queues, so e1 get + // 1/5 capacity + CSQueue e = cs.getQueue("root.e-auto"); + Assert.assertEquals(1 / 5f, e.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, e.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(240 * GB, + e.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Under e, there's only one queue, so e1/e have same capacity + CSQueue e1 = cs.getQueue("root.e-auto.e1-auto"); + Assert.assertEquals(1 / 5f, e1.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, e1.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(240 * GB, + e1.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + } + + /* + Create and validate the structure: + root + ┌─────┬────────┬─────┴───────┐ + a b c-auto d-auto + | + a1 + */ + @Test + public void testAutoCreateQueueWithSiblingsUnderRoot() throws Exception { + startScheduler(); + + createQueue("root.c-auto"); + + // Check if queue c-auto got created + CSQueue c = cs.getQueue("root.c-auto"); + Assert.assertEquals(1 / 3f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(400 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Now add another queue-d, in the same hierarchy + createQueue("root.d-auto"); + + // Because queue-d has the same weight of other sibling queue, its abs cap + // become 1/4 + CSQueue d = cs.getQueue("root.d-auto"); + Assert.assertEquals(1 / 4f, d.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(300 * GB, + d.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + + // Now we check queue c again, it should also become 1/4 capacity + Assert.assertEquals(1 / 4f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, c.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(300 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + } + + /* + Create and validate the structure: + root + ┌─────┴─────┐ + b a + / \ + a1 a2-auto + */ + @Test + public void testAutoCreateQueueStaticParentOneLevel() throws Exception { + startScheduler(); + // Now we add a two-level queue, create leaf only + // Now add another queue a2-auto, under root.a + createQueue("root.a.a2-auto"); + + // root.a has 1/2 abs resource, a2/a1 has the same weight, so a2 has 1/4 abs + // capacity + CSQueue a2 = cs.getQueue("root.a.a2-auto"); + Assert.assertEquals(1 / 4f, a2.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, a2.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB, + a2.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(), + 1e-6); + + } + + /* + Create and validate the structure: + root + ┌─────┴─────┐ + b a + | \ + a1 a2-auto + | \ + a3-auto a4-auto + */ + @Test + public void testAutoCreateQueueAutoParentTwoLevelsWithSiblings() + throws Exception { + startScheduler(); + csConf.setAutoQueueCreationV2Enabled("root.a.a2-auto", true); + + // root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4 + // -> a3-auto is alone with weight 1/4 + createQueue("root.a.a2-auto.a3-auto"); + CSQueue a3 = cs.getQueue("root.a.a2-auto.a3-auto"); + Assert.assertEquals(1 / 4f, a3.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(MAX_MEMORY * (1 / 4f) * GB, + a3.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(), + 1e-6); + + // root.a has 1/2 abs resource -> a1 and a2-auto same weight 1/4 + // -> a3-auto and a4-auto same weight 1/8 + createQueue("root.a.a2-auto.a4-auto"); + CSQueue a4 = cs.getQueue("root.a.a2-auto.a4-auto"); + Assert.assertEquals(1 / 8f, a3.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(1f, a3.getQueueCapacities().getWeight(), 1e-6); + Assert.assertEquals(MAX_MEMORY * (1 / 8f) * GB, + a4.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize(), + 1e-6); + } + + @Test(expected = SchedulerDynamicEditException.class) + public void testAutoCreateQueueShouldFailWhenNonParentQueue() + throws Exception { + startScheduler(); + createQueue("root.a.a1.a2-auto"); + } + + @Test(expected = SchedulerDynamicEditException.class) + public void testAutoCreateQueueWhenSiblingsNotInWeightMode() + throws Exception { + startScheduler(); + csConf.setCapacity("root.a", 50f); + csConf.setCapacity("root.b", 50f); + csConf.setCapacity("root.a.a1", 100f); + cs.reinitialize(csConf, mockRM.getRMContext()); + createQueue("root.a.a2-auto"); + } + + @Test(expected = SchedulerDynamicEditException.class) + public void testAutoCreateQueueShouldFailIfDepthIsAboveLimit() + throws Exception { + startScheduler(); + createQueue("root.a.a3-auto.a4-auto.a5-auto"); + } + + @Test(expected = SchedulerDynamicEditException.class) + public void testAutoCreateQueueShouldFailIfNotEnabledForParent() + throws Exception { + startScheduler(); + csConf.setAutoQueueCreationV2Enabled("root", false); + cs.reinitialize(csConf, mockRM.getRMContext()); + createQueue("root.c-auto"); + } + + @Test + public void testAutoCreateQueueRefresh() throws Exception { + startScheduler(); + + createBasicQueueStructureAndValidate(); + + // Refresh the queue to make sure all queues are still exist. + // (Basically, dynamic queues should not disappear after refresh). + cs.reinitialize(csConf, mockRM.getRMContext()); + + // Double confirm, after refresh, we should still see root queue has 5 + // children. + Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size()); + Assert.assertNotNull(cs.getQueue("root.c-auto")); + } + + @Test + public void testConvertDynamicToStaticQueue() throws Exception { + startScheduler(); + + createBasicQueueStructureAndValidate(); + + // Now, update root.a's weight to 6 + csConf.setNonLabeledQueueWeight("root.a", 6f); + cs.reinitialize(csConf, mockRM.getRMContext()); + + // Double confirm, after refresh, we should still see root queue has 5 + // children. + Assert.assertEquals(5, cs.getQueue("root").getChildQueues().size()); + + // Get queue a + CSQueue a = cs.getQueue("root.a"); + + // a's abs resource should be 6/10, (since a.weight=6, all other 4 peers + // have weight=1). + Assert.assertEquals(6 / 10f, a.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(720 * GB, + a.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + Assert.assertEquals(6f, a.getQueueCapacities().getWeight(), 1e-6); + + // Set queue c-auto's weight to 6, and mark c-auto to be static queue + csConf.setQueues("root", new String[]{"a", "b", "c-auto"}); + csConf.setNonLabeledQueueWeight("root.c-auto", 6f); + cs.reinitialize(csConf, mockRM.getRMContext()); + + // Get queue c + CSQueue c = cs.getQueue("root.c-auto"); + + // c's abs resource should be 6/15, (since a/c.weight=6, all other 3 peers + // have weight=1). + Assert.assertEquals(6 / 15f, c.getAbsoluteCapacity(), 1e-6); + Assert.assertEquals(480 * GB, + c.getQueueResourceQuotas().getEffectiveMinResource().getMemorySize()); + Assert.assertEquals(6f, c.getQueueCapacities().getWeight(), 1e-6); + + // First, create e2-auto queue + createQueue("root.e-auto.e2-auto"); + + // Do change 2nd level queue from dynamic to static + csConf.setQueues("root", new String[]{"a", "b", "c-auto", "e-auto"}); + csConf.setNonLabeledQueueWeight("root.e-auto", 6f); + csConf.setQueues("root.e-auto", new String[]{"e1-auto"}); + csConf.setNonLabeledQueueWeight("root.e-auto.e1-auto", 6f); + cs.reinitialize(csConf, mockRM.getRMContext()); + + // Get queue e1 + CSQueue e1 = cs.getQueue("root.e-auto.e1-auto"); + + // e's abs resource should be 6/20 * (6/7), + // (since a/c/e.weight=6, all other 2 peers + // have weight=1, and e1's weight is 6, e2's weight is 1). + float e1NormalizedWeight = (6 / 20f) * (6 / 7f); + Assert.assertEquals(e1NormalizedWeight, e1.getAbsoluteCapacity(), 1e-6); + assertQueueMinResource(e1, MAX_MEMORY * e1NormalizedWeight); + Assert.assertEquals(6f, e1.getQueueCapacities().getWeight(), 1e-6); + } + + /* + Create the structure and convert d-auto to static and leave d1-auto as dynamic + root + ┌─────┬─────────────┴──────┐ + a b d-auto + | | + a1 d1-auto + */ + @Test + public void testConvertDynamicParentToStaticParent() throws Exception { + startScheduler(); + createQueue("root.d-auto.d1-auto"); + csConf.setQueues("root", new String[]{"a", "b", "d-auto"}); + csConf.setNonLabeledQueueWeight("root.a", 6f); + csConf.setNonLabeledQueueWeight("root.d-auto", 1f); + cs.reinitialize(csConf, mockRM.getRMContext()); + + CSQueue d = cs.getQueue("root.d-auto"); + + Assert.assertEquals(1 / 8f, d.getAbsoluteCapacity(), 1e-6); + assertQueueMinResource(d, MAX_MEMORY * (1 / 8f)); + Assert.assertEquals(1f, d.getQueueCapacities().getWeight(), 1e-6); + + CSQueue d1 = cs.getQueue("root.d-auto.d1-auto"); + Assert.assertEquals(1 / 8f, d1.getAbsoluteCapacity(), 1e-6); + assertQueueMinResource(d1, MAX_MEMORY * (1 / 8f)); + Assert.assertEquals(1f, d1.getQueueCapacities().getWeight(), 1e-6); + } + + @Test + public void testAutoQueueCreationOnAppSubmission() throws Exception { + startScheduler(); + createBasicQueueStructureAndValidate(); + + submitApp(cs, USER0, USER0, "root.e-auto"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e-auto." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + } + + private LeafQueue createQueue(String queuePath) throws YarnException { + return autoQueueHandler.autoCreateQueue( + CSQueueUtils.extractQueuePath(queuePath)); + } + + private void assertQueueMinResource(CSQueue queue, float expected) { + Assert.assertEquals(Math.round(expected * GB), + queue.getQueueResourceQuotas().getEffectiveMinResource() + .getMemorySize(), 1e-6); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3a6fe2a8521..0c9799d932b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3291,7 +3291,11 @@ public class TestLeafQueue { newQueues, queues, TestUtils.spyHook); queues = newQueues; + // This will not update active apps root.reinitialize(newRoot, csContext.getClusterResource()); + // Cause this to update active apps + root.updateClusterResource(csContext.getClusterResource(), + new ResourceLimits(csContext.getClusterResource())); // after reinitialization assertEquals(3, e.getNumActiveApplications());