From 8ffe86f7804af9446e2143ca0eee94fca1240c68 Mon Sep 17 00:00:00 2001 From: Li Lu Date: Wed, 30 Nov 2016 13:38:42 -0800 Subject: [PATCH] YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9) (cherry picked from commit 69fb70c31aa277f7fb14b05c0185ddc5cd90793d) --- .../scheduler/SchedulerQueueManager.java | 75 ++++ .../scheduler/capacity/CapacityScheduler.java | 294 ++------------ .../CapacitySchedulerQueueManager.java | 361 ++++++++++++++++++ .../capacity/TestApplicationLimits.java | 35 +- .../TestApplicationLimitsByPartition.java | 7 +- .../capacity/TestChildQueueOrder.java | 9 +- .../scheduler/capacity/TestLeafQueue.java | 9 +- .../scheduler/capacity/TestParentQueue.java | 39 +- .../scheduler/capacity/TestReservations.java | 8 +- .../scheduler/capacity/TestUtils.java | 2 +- 10 files changed, 536 insertions(+), 303 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java new file mode 100644 index 00000000000..92b989a8b25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java @@ -0,0 +1,75 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + +/** + * + * Context of the Queues in Scheduler. + * + */ +@Private +@Unstable +public interface SchedulerQueueManager { + + /** + * Get the root queue. + * @return root queue + */ + T getRootQueue(); + + /** + * Get all the queues. + * @return a map contains all the queues as well as related queue names + */ + Map getQueues(); + + /** + * Remove the queue from the existing queue. + * @param queueName the queue name + */ + void removeQueue(String queueName); + + /** + * Add a new queue to the existing queues. + * @param queueName the queue name + * @param queue the queue object + */ + void addQueue(String queueName, T queue); + + /** + * Get a queue matching the specified queue name. + * @param queueName the queue name + * @return a queue object + */ + T getQueue(String queueName); + + /** + * Reinitialize the queues. + * @param newConf the configuration + * @throws IOException if fails to re-initialize queues + */ + void reinitializeQueues(E newConf) throws IOException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 77868e95051..489ae422296 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -68,8 +67,6 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; -import org.apache.hadoop.yarn.security.Permission; -import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; @@ -157,9 +154,9 @@ public class CapacityScheduler extends ResourceAllocationCommitter { private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); - private YarnAuthorizationProvider authorizer; - private CSQueue root; + private CapacitySchedulerQueueManager queueManager; + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -169,22 +166,6 @@ public class CapacityScheduler extends private int offswitchPerHeartbeatLimit; - static final Comparator nonPartitionedQueueComparator = - new Comparator() { - @Override - public int compare(CSQueue q1, CSQueue q2) { - if (q1.getUsedCapacity() < q2.getUsedCapacity()) { - return -1; - } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { - return 1; - } - - return q1.getQueuePath().compareTo(q2.getQueuePath()); - } - }; - - static final PartitionedQueueComparator partitionedQueueComparator = - new PartitionedQueueComparator(); @Override public void setConf(Configuration conf) { @@ -237,8 +218,6 @@ public Configuration getConf() { private CapacitySchedulerConfiguration conf; private Configuration yarnConf; - private Map queues = new ConcurrentHashMap(); - private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -262,11 +241,11 @@ public CapacityScheduler() { @Override public QueueMetrics getRootQueueMetrics() { - return root.getMetrics(); + return getRootQueue().getMetrics(); } public CSQueue getRootQueue() { - return root; + return queueManager.getRootQueue(); } @Override @@ -291,12 +270,12 @@ public void setResourceCalculator(ResourceCalculator rc) { @Override public Comparator getNonPartitionedQueueComparator() { - return nonPartitionedQueueComparator; + return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR; } @Override public PartitionedQueueComparator getPartitionedQueueComparator() { - return partitionedQueueComparator; + return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR; } @Override @@ -327,7 +306,10 @@ void initScheduler(Configuration configuration) throws this.usePortForNodeName = this.conf.getUsePortForNodeName(); this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); - authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.queueManager = new CapacitySchedulerQueueManager(yarnConf, + this.labelManager); + this.queueManager.setCapacitySchedulerContext(this); + this.activitiesManager = new ActivitiesManager(rmContext); activitiesManager.init(conf); initializeQueues(this.conf); @@ -555,13 +537,6 @@ public int getPendingBacklogs() { } } - static class QueueHook { - public CSQueue hook(CSQueue queue) { - return queue; - } - } - private static final QueueHook noop = new QueueHook(); - @VisibleForTesting public UserGroupMappingPlacementRule getUserGroupMappingPlacementRule() throws IOException { @@ -579,7 +554,7 @@ public CSQueue hook(CSQueue queue) { if (!mappingQueue.equals( UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = queues.get(mappingQueue); + CSQueue queue = getQueue(mappingQueue); if (queue == null || !(queue instanceof LeafQueue)) { throw new IOException( "mapping contains invalid or non-leaf queue " + mappingQueue); @@ -617,184 +592,29 @@ private void updatePlacementRules() throws IOException { private void initializeQueues(CapacitySchedulerConfiguration conf) throws IOException { - root = - parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - queues, queues, noop); - labelManager.reinitializeQueueLabels(getQueueToLabels()); - LOG.info("Initialized root queue " + root); + this.queueManager.initializeQueues(conf); + updatePlacementRules(); - setQueueAcls(authorizer, queues); // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); + preemptionManager.refreshQueues(null, this.getRootQueue()); } @Lock(CapacityScheduler.class) private void reinitializeQueues(CapacitySchedulerConfiguration newConf) throws IOException { - // Parse new queues - Map newQueues = new HashMap(); - CSQueue newRoot = - parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); - - // Ensure all existing queues are still present - validateExistingQueues(queues, newQueues); - - // Add new queues - addNewQueues(queues, newQueues); - - // Re-configure queues - root.reinitialize(newRoot, getClusterResource()); + this.queueManager.reinitializeQueues(newConf); updatePlacementRules(); - // Re-calculate headroom for active applications - Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, new ResourceLimits( - clusterResource)); - - labelManager.reinitializeQueueLabels(getQueueToLabels()); - setQueueAcls(authorizer, queues); - // Notify Preemption Manager - preemptionManager.refreshQueues(null, root); - } - - @VisibleForTesting - public static void setQueueAcls(YarnAuthorizationProvider authorizer, - Map queues) throws IOException { - List permissions = new ArrayList<>(); - for (CSQueue queue : queues.values()) { - AbstractCSQueue csQueue = (AbstractCSQueue) queue; - permissions.add( - new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); - } - authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser()); - } - - private Map> getQueueToLabels() { - Map> queueToLabels = new HashMap>(); - for (CSQueue queue : queues.values()) { - queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); - } - return queueToLabels; - } - - /** - * Ensure all existing queues are present. Queues cannot be deleted - * @param queues existing queues - * @param newQueues new queues - */ - @Lock(CapacityScheduler.class) - private void validateExistingQueues( - Map queues, Map newQueues) - throws IOException { - // check that all static queues are included in the newQueues list - for (Map.Entry e : queues.entrySet()) { - if (!(e.getValue() instanceof ReservationQueue)) { - String queueName = e.getKey(); - CSQueue oldQueue = e.getValue(); - CSQueue newQueue = newQueues.get(queueName); - if (null == newQueue) { - throw new IOException(queueName + " cannot be found during refresh!"); - } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { - throw new IOException(queueName + " is moved from:" - + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() - + " after refresh, which is not allowed."); - } - } - } - } - - /** - * Add the new queues (only) to our list of queues... - * ... be careful, do not overwrite existing queues. - * @param queues - * @param newQueues - */ - @Lock(CapacityScheduler.class) - private void addNewQueues( - Map queues, Map newQueues) - { - for (Map.Entry e : newQueues.entrySet()) { - String queueName = e.getKey(); - CSQueue queue = e.getValue(); - if (!queues.containsKey(queueName)) { - queues.put(queueName, queue); - } - } - } - - @Lock(CapacityScheduler.class) - static CSQueue parseQueue( - CapacitySchedulerContext csContext, - CapacitySchedulerConfiguration conf, - CSQueue parent, String queueName, Map queues, - Map oldQueues, - QueueHook hook) throws IOException { - CSQueue queue; - String fullQueueName = - (parent == null) ? queueName - : (parent.getQueuePath() + "." + queueName); - String[] childQueueNames = - conf.getQueues(fullQueueName); - boolean isReservableQueue = conf.isReservable(fullQueueName); - if (childQueueNames == null || childQueueNames.length == 0) { - if (null == parent) { - throw new IllegalStateException( - "Queue configuration missing child queue names for " + queueName); - } - // Check if the queue will be dynamically managed by the Reservation - // system - if (isReservableQueue) { - queue = - new PlanQueue(csContext, queueName, parent, - oldQueues.get(queueName)); - } else { - queue = - new LeafQueue(csContext, queueName, parent, - oldQueues.get(queueName)); - - // Used only for unit tests - queue = hook.hook(queue); - } - } else { - if (isReservableQueue) { - throw new IllegalStateException( - "Only Leaf Queues can be reservable for " + queueName); - } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); - - // Used only for unit tests - queue = hook.hook(parentQueue); - - List childQueues = new ArrayList(); - for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); - childQueues.add(childQueue); - } - parentQueue.setChildQueues(childQueues); - } - - if (queue instanceof LeafQueue && queues.containsKey(queueName) - && queues.get(queueName) instanceof LeafQueue) { - throw new IOException("Two leaf queues were named " + queueName - + ". Leaf queue names must be distinct"); - } - queues.put(queueName, queue); - - LOG.info("Initialized queue: " + queue); - return queue; + preemptionManager.refreshQueues(null, this.getRootQueue()); } public CSQueue getQueue(String queueName) { if (queueName == null) { return null; } - return queues.get(queueName); + return this.queueManager.getQueue(queueName); } private void addApplicationOnRecovery( @@ -1048,7 +868,7 @@ private void doneApplicationAttempt( // Inform the queue String queueName = attempt.getQueue().getQueueName(); - CSQueue queue = queues.get(queueName); + CSQueue queue = this.getQueue(queueName); if (!(queue instanceof LeafQueue)) { LOG.error( "Cannot finish application " + "from non-leaf queue: " + queueName); @@ -1175,7 +995,7 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, boolean recursive) throws IOException { CSQueue queue = null; - queue = this.queues.get(queueName); + queue = this.getQueue(queueName); if (queue == null) { throw new IOException("Unknown queue: " + queueName); } @@ -1193,7 +1013,7 @@ public List getQueueUserAclInfo() { return new ArrayList(); } - return root.getQueueUserAclInfo(user); + return getRootQueue().getQueueUserAclInfo(user); } @Override @@ -1236,7 +1056,7 @@ private void updateNodeAndQueueResource(RMNode nm, writeLock.lock(); updateNodeResource(nm, resourceOption); Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); } finally { writeLock.unlock(); @@ -1472,8 +1292,8 @@ private CSAssignment allocateContainerOnSingleNode(PlacementSet ps, boolean withNodeHeartbeat) { - CSAssignment assignment = root.assignContainers(getClusterResource(), ps, - new ResourceLimits(labelManager + CSAssignment assignment = getRootQueue().assignContainers( + getClusterResource(), ps, new ResourceLimits(labelManager .getResourceByLabel(ps.getPartition(), getClusterResource())), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); @@ -1507,7 +1327,7 @@ private CSAssignment allocateOrReserveNewContainers( } // Try to use NON_EXCLUSIVE - assignment = root.assignContainers(getClusterResource(), ps, + assignment = getRootQueue().assignContainers(getClusterResource(), ps, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager @@ -1527,8 +1347,8 @@ private CSAssignment allocateContainersOnMultiNodes( PlacementSet ps) { // When this time look at multiple nodes, try schedule if the // partition has any available resource or killable resource - if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f - && preemptionManager.getKillableResource( + if (getRootQueue().getQueueCapacities().getUsedCapacity( + ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource( CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources .none()) { if (LOG.isDebugEnabled()) { @@ -1711,7 +1531,7 @@ private void updateNodeLabelsAndQueueResource( updateLabelsOnNode(id, labels); } Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); } finally { writeLock.unlock(); @@ -1732,7 +1552,7 @@ private void addNode(RMNode nodeManager) { } Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); LOG.info( @@ -1783,7 +1603,7 @@ private void removeNode(RMNode nodeInfo) { nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); - root.updateClusterResource(clusterResource, + getRootQueue().updateClusterResource(clusterResource, new ResourceLimits(clusterResource)); int numNodes = nodeTracker.nodeCount(); @@ -2021,7 +1841,7 @@ public boolean checkAccess(UserGroupInformation callerUGI, @Override public List getAppsInQueue(String queueName) { - CSQueue queue = queues.get(queueName); + CSQueue queue = getQueue(queueName); if (queue == null) { return null; } @@ -2031,7 +1851,8 @@ public List getAppsInQueue(String queueName) { } public boolean isSystemAppsLimitReached() { - if (root.getNumApplications() < conf.getMaximumSystemApplications()) { + if (getRootQueue().getNumApplications() < conf + .getMaximumSystemApplications()) { return false; } return true; @@ -2132,7 +1953,7 @@ public void removeQueue(String queueName) } ((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q); - this.queues.remove(queueName); + this.queueManager.removeQueue(queueName); LOG.info("Removal of ReservationQueue " + queueName + " has succeeded"); } finally { writeLock.unlock(); @@ -2161,7 +1982,7 @@ public void addQueue(Queue queue) PlanQueue parentPlan = (PlanQueue) newQueue.getParent(); String queuename = newQueue.getQueueName(); parentPlan.addChildQueue(newQueue); - this.queues.put(queuename, newQueue); + this.queueManager.addQueue(queuename, newQueue); LOG.info("Creation of ReservationQueue " + newQueue + " succeeded"); } finally { writeLock.unlock(); @@ -2173,7 +1994,7 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) throws YarnException { try { writeLock.lock(); - LeafQueue queue = getAndCheckLeafQueue(inQueue); + LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); ParentQueue parent = (ParentQueue) queue.getParent(); if (!(queue instanceof ReservationQueue)) { @@ -2225,9 +2046,10 @@ public String moveApplication(ApplicationId appId, FiCaSchedulerApp app = getApplicationAttempt( ApplicationAttemptId.newInstance(appId, 0)); String sourceQueueName = app.getQueue().getQueueName(); - LeafQueue source = getAndCheckLeafQueue(sourceQueueName); + LeafQueue source = this.queueManager.getAndCheckLeafQueue( + sourceQueueName); String destQueueName = handleMoveToPlanQueue(targetQueueName); - LeafQueue dest = getAndCheckLeafQueue(destQueueName); + LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName); // Validation check - ACLs, submission limits for user & queue String user = app.getUser(); checkQueuePartition(app, dest); @@ -2291,27 +2113,6 @@ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) } } - /** - * Check that the String provided in input is the name of an existing, - * LeafQueue, if successful returns the queue. - * - * @param queue - * @return the LeafQueue - * @throws YarnException - */ - private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { - CSQueue ret = this.getQueue(queue); - if (ret == null) { - throw new YarnException("The specified Queue: " + queue - + " doesn't exist"); - } - if (!(ret instanceof LeafQueue)) { - throw new YarnException("The specified Queue: " + queue - + " is not a Leaf Queue. Move is supported only for Leaf Queues."); - } - return (LeafQueue) ret; - } - /** {@inheritDoc} */ @Override public EnumSet getSchedulingResourceTypes() { @@ -2348,7 +2149,7 @@ private String handleMoveToPlanQueue(String targetQueueName) { @Override public Set getPlanQueues() { Set ret = new HashSet(); - for (Map.Entry l : queues.entrySet()) { + for (Map.Entry l : queueManager.getQueues().entrySet()) { if (l.getValue() instanceof PlanQueue) { ret.add(l.getKey()); } @@ -2368,7 +2169,8 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, if (null == priorityFromContext) { // Get the default priority for the Queue. If Queue is non-existent, then // use default priority - priorityFromContext = getDefaultPriorityForQueue(queueName); + priorityFromContext = this.queueManager.getDefaultPriorityForQueue( + queueName); LOG.info("Application '" + applicationId + "' is submitted without priority " @@ -2392,18 +2194,6 @@ public Priority checkAndGetApplicationPriority(Priority priorityFromContext, return appPriority; } - private Priority getDefaultPriorityForQueue(String queueName) { - Queue queue = getQueue(queueName); - if (null == queue || null == queue.getDefaultApplicationPriority()) { - // Return with default application priority - return Priority.newInstance(CapacitySchedulerConfiguration - .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); - } - - return Priority.newInstance(queue.getDefaultApplicationPriority() - .getPriority()); - } - @Override public Priority updateApplicationPriority(Priority newPriority, ApplicationId applicationId, SettableFuture future) @@ -2457,7 +2247,7 @@ public PreemptionManager getPreemptionManager() { @Override public ResourceUsage getClusterResourceUsage() { - return root.getQueueResourceUsage(); + return getRootQueue().getQueueResourceUsage(); } private SchedulerContainer getSchedulerContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java new file mode 100644 index 00000000000..7a6ce567f87 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -0,0 +1,361 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.security.Permission; +import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager; + +/** + * + * Context of the Queues in Capacity Scheduler. + * + */ +@Private +@Unstable +public class CapacitySchedulerQueueManager implements SchedulerQueueManager< + CSQueue, CapacitySchedulerConfiguration>{ + + private static final Log LOG = LogFactory.getLog( + CapacitySchedulerQueueManager.class); + + static final Comparator NON_PARTITIONED_QUEUE_COMPARATOR = + new Comparator() { + @Override + public int compare(CSQueue q1, CSQueue q2) { + if (q1.getUsedCapacity() < q2.getUsedCapacity()) { + return -1; + } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) { + return 1; + } + + return q1.getQueuePath().compareTo(q2.getQueuePath()); + } + }; + + static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR = + new PartitionedQueueComparator(); + + static class QueueHook { + public CSQueue hook(CSQueue queue) { + return queue; + } + } + + private static final QueueHook NOOP = new QueueHook(); + private CapacitySchedulerContext csContext; + private final YarnAuthorizationProvider authorizer; + private final Map queues = new ConcurrentHashMap<>(); + private CSQueue root; + private final RMNodeLabelsManager labelManager; + + /** + * Construct the service. + * @param conf the configuration + * @param labelManager the labelManager + */ + public CapacitySchedulerQueueManager(Configuration conf, + RMNodeLabelsManager labelManager) { + this.authorizer = YarnAuthorizationProvider.getInstance(conf); + this.labelManager = labelManager; + } + + @Override + public CSQueue getRootQueue() { + return this.root; + } + + @Override + public Map getQueues() { + return queues; + } + + @Override + public void removeQueue(String queueName) { + this.queues.remove(queueName); + } + + @Override + public void addQueue(String queueName, CSQueue queue) { + this.queues.put(queueName, queue); + } + + @Override + public CSQueue getQueue(String queueName) { + return queues.get(queueName); + } + + /** + * Set the CapacitySchedulerContext. + * @param capacitySchedulerContext the CapacitySchedulerContext + */ + public void setCapacitySchedulerContext( + CapacitySchedulerContext capacitySchedulerContext) { + this.csContext = capacitySchedulerContext; + } + + /** + * Initialized the queues. + * @param conf the CapacitySchedulerConfiguration + * @throws IOException if fails to initialize queues + */ + public void initializeQueues(CapacitySchedulerConfiguration conf) + throws IOException { + root = parseQueue(this.csContext, conf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); + setQueueAcls(authorizer, queues); + labelManager.reinitializeQueueLabels(getQueueToLabels()); + LOG.info("Initialized root queue " + root); + } + + @Override + public void reinitializeQueues(CapacitySchedulerConfiguration newConf) + throws IOException { + // Parse new queues + Map newQueues = new HashMap<>(); + CSQueue newRoot = parseQueue(this.csContext, newConf, null, + CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP); + + // Ensure all existing queues are still present + validateExistingQueues(queues, newQueues); + + // Add new queues + addNewQueues(queues, newQueues); + + // Re-configure queues + root.reinitialize(newRoot, this.csContext.getClusterResource()); + + setQueueAcls(authorizer, queues); + + // Re-calculate headroom for active applications + Resource clusterResource = this.csContext.getClusterResource(); + root.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + + labelManager.reinitializeQueueLabels(getQueueToLabels()); + } + + /** + * Parse the queue from the configuration. + * @param csContext the CapacitySchedulerContext + * @param conf the CapacitySchedulerConfiguration + * @param parent the parent queue + * @param queueName the queue name + * @param queues all the queues + * @param oldQueues the old queues + * @param hook the queue hook + * @return the CSQueue + * @throws IOException + */ + static CSQueue parseQueue( + CapacitySchedulerContext csContext, + CapacitySchedulerConfiguration conf, + CSQueue parent, String queueName, Map queues, + Map oldQueues, + QueueHook hook) throws IOException { + CSQueue queue; + String fullQueueName = + (parent == null) ? queueName + : (parent.getQueuePath() + "." + queueName); + String[] childQueueNames = conf.getQueues(fullQueueName); + boolean isReservableQueue = conf.isReservable(fullQueueName); + if (childQueueNames == null || childQueueNames.length == 0) { + if (null == parent) { + throw new IllegalStateException( + "Queue configuration missing child queue names for " + queueName); + } + // Check if the queue will be dynamically managed by the Reservation + // system + if (isReservableQueue) { + queue = + new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else { + queue = + new LeafQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + + // Used only for unit tests + queue = hook.hook(queue); + } + } else { + if (isReservableQueue) { + throw new IllegalStateException( + "Only Leaf Queues can be reservable for " + queueName); + } + ParentQueue parentQueue = + new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + + // Used only for unit tests + queue = hook.hook(parentQueue); + + List childQueues = new ArrayList<>(); + for (String childQueueName : childQueueNames) { + CSQueue childQueue = + parseQueue(csContext, conf, queue, childQueueName, + queues, oldQueues, hook); + childQueues.add(childQueue); + } + parentQueue.setChildQueues(childQueues); + } + + if (queue instanceof LeafQueue && queues.containsKey(queueName) + && queues.get(queueName) instanceof LeafQueue) { + throw new IOException("Two leaf queues were named " + queueName + + ". Leaf queue names must be distinct"); + } + queues.put(queueName, queue); + + LOG.info("Initialized queue: " + queue); + return queue; + } + + /** + * Ensure all existing queues are present. Queues cannot be deleted + * @param queues existing queues + * @param newQueues new queues + */ + private void validateExistingQueues( + Map queues, Map newQueues) + throws IOException { + // check that all static queues are included in the newQueues list + for (Map.Entry e : queues.entrySet()) { + if (!(e.getValue() instanceof ReservationQueue)) { + String queueName = e.getKey(); + CSQueue oldQueue = e.getValue(); + CSQueue newQueue = newQueues.get(queueName); + if (null == newQueue) { + throw new IOException(queueName + " cannot be found during refresh!"); + } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { + throw new IOException(queueName + " is moved from:" + + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } + } + } + } + + /** + * Add the new queues (only) to our list of queues... + * ... be careful, do not overwrite existing queues. + * @param queues the existing queues + * @param newQueues the new queues + */ + private void addNewQueues( + Map queues, Map newQueues) { + for (Map.Entry e : newQueues.entrySet()) { + String queueName = e.getKey(); + CSQueue queue = e.getValue(); + if (!queues.containsKey(queueName)) { + queues.put(queueName, queue); + } + } + } + + @VisibleForTesting + /** + * Set the acls for the queues. + * @param authorizer the yarnAuthorizationProvider + * @param queues the queues + * @throws IOException if fails to set queue acls + */ + public static void setQueueAcls(YarnAuthorizationProvider authorizer, + Map queues) throws IOException { + List permissions = new ArrayList<>(); + for (CSQueue queue : queues.values()) { + AbstractCSQueue csQueue = (AbstractCSQueue) queue; + permissions.add( + new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs())); + } + authorizer.setPermission(permissions, + UserGroupInformation.getCurrentUser()); + } + + /** + * Check that the String provided in input is the name of an existing, + * LeafQueue, if successful returns the queue. + * + * @param queue the queue name + * @return the LeafQueue + * @throws YarnException if the queue does not exist or the queue + * is not the type of LeafQueue. + */ + public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { + CSQueue ret = this.getQueue(queue); + if (ret == null) { + throw new YarnException("The specified Queue: " + queue + + " doesn't exist"); + } + if (!(ret instanceof LeafQueue)) { + throw new YarnException("The specified Queue: " + queue + + " is not a Leaf Queue."); + } + return (LeafQueue) ret; + } + + /** + * Get the default priority of the queue. + * @param queueName the queue name + * @return the default priority of the queue + */ + public Priority getDefaultPriorityForQueue(String queueName) { + Queue queue = getQueue(queueName); + if (null == queue || null == queue.getDefaultApplicationPriority()) { + // Return with default application priority + return Priority.newInstance(CapacitySchedulerConfiguration + .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY); + } + return Priority.newInstance(queue.getDefaultApplicationPriority() + .getPriority()); + } + + /** + * Get a map of queueToLabels. + * @return the map of queueToLabels + */ + private Map> getQueueToLabels() { + Map> queueToLabels = new HashMap<>(); + for (CSQueue queue : getQueues().values()) { + queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels()); + } + return queueToLabels; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 1c38bc285c5..0ec0260471f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -111,7 +111,8 @@ public void setUp() throws IOException { when(csContext.getClusterResource()). thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -123,9 +124,9 @@ public void setUp() throws IOException { containerTokenSecretManager); Map queues = new HashMap(); - CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, + CSQueue root = CapacitySchedulerQueueManager + .parseQueue(csContext, csConf, null, "root", + queues, queues, TestUtils.spyHook); @@ -276,7 +277,8 @@ public void testLimitsComputation() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB, 16)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -288,8 +290,8 @@ public void testLimitsComputation() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + "root", queues, queues, TestUtils.spyHook); LeafQueue queue = (LeafQueue)queues.get(A); @@ -356,9 +358,9 @@ public void testLimitsComputation() throws Exception { + ".maximum-am-resource-percent", 0.5f); // Re-create queues to get new configs. queues = new HashMap(); - root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + root = CapacitySchedulerQueueManager.parseQueue( + csContext, csConf, null, "root", + queues, queues, TestUtils.spyHook); clusterResource = Resources.createResource(100 * 16 * GB); queue = (LeafQueue)queues.get(A); @@ -378,9 +380,9 @@ public void testLimitsComputation() throws Exception { 9999); // Re-create queues to get new configs. queues = new HashMap(); - root = - CapacityScheduler.parseQueue(csContext, csConf, null, "root", - queues, queues, TestUtils.spyHook); + root = CapacitySchedulerQueueManager.parseQueue( + csContext, csConf, null, "root", + queues, queues, TestUtils.spyHook); queue = (LeafQueue)queues.get(A); assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath())); @@ -580,7 +582,8 @@ public void testHeadroom() throws Exception { when(csContext.getMaximumResourceCapability()). thenReturn(Resources.createResource(16*GB)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -589,8 +592,8 @@ public void testHeadroom() throws Exception { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, - "root", queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, "root", queues, queues, TestUtils.spyHook); ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage(); when(csContext.getClusterResourceUsage()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index d33555265d9..5c53fda5604 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -595,7 +595,8 @@ public void testHeadroom() throws Exception { when(csContext.getMaximumResourceCapability()) .thenReturn(Resources.createResource(16 * GB)); when(csContext.getNonPartitionedQueueComparator()) - .thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + .thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); RMContext rmContext = TestUtils.getMockRMContext(); RMContext spyRMContext = spy(rmContext); @@ -614,8 +615,8 @@ public void testHeadroom() throws Exception { when(csContext.getClusterResource()).thenReturn(clusterResource); Map queues = new HashMap(); - CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null, - "root", queues, queues, TestUtils.spyHook); + CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, "root", queues, queues, TestUtils.spyHook); ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage(); when(csContext.getClusterResourceUsage()) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 40e5d2a4a36..a6ae0c2d4e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -95,11 +95,12 @@ public void setUp() throws Exception { when(csContext.getMaximumResourceCapability()).thenReturn( Resources.createResource(16*GB, 32)); when(csContext.getClusterResource()). - thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); + thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). - thenReturn(resourceComparator); + thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); } @@ -222,7 +223,7 @@ public void testSortedQueues() throws Exception { setupSortedQueues(csConf); Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ed47c303607..b419c4a3329 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -175,7 +175,8 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); @@ -188,7 +189,7 @@ private void setUpInternal(ResourceCalculator rC) throws Exception { containerTokenSecretManager); root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -2380,7 +2381,7 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception { .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2); Map newQueues = new HashMap(); CSQueue newRoot = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); @@ -2405,7 +2406,7 @@ public void testNodeLocalityAfterQueueRefresh() throws Exception { .NODE_LOCALITY_DELAY, 60); Map newQueues = new HashMap(); CSQueue newRoot = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index d8759693dd8..a36db44d7ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -97,10 +97,11 @@ public void setUp() throws Exception { when(csContext.getClusterResource()). thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getNonPartitionedQueueComparator()). - thenReturn(CapacityScheduler.nonPartitionedQueueComparator); + thenReturn( + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getResourceCalculator()). - thenReturn(resourceComparator); + thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); } @@ -231,7 +232,7 @@ public void testSingleLevelQueues() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -346,7 +347,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { Map queues = new HashMap(); boolean exceptionOccured = false; try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -360,7 +361,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { exceptionOccured = false; queues.clear(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -374,7 +375,7 @@ public void testSingleLevelQueuesPrecision() throws Exception { exceptionOccured = false; queues.clear(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException ie) { @@ -467,7 +468,7 @@ public void testMultiLevelQueues() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -623,8 +624,8 @@ public void testQueueCapacitySettingChildZero() throws Exception { csConf.setCapacity(Q_B + "." + B3, 0); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } @@ -640,8 +641,8 @@ public void testQueueCapacitySettingParentZero() throws Exception { csConf.setCapacity(Q_A, 60); Map queues = new HashMap(); - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } @@ -662,8 +663,8 @@ public void testQueueCapacityZero() throws Exception { Map queues = new HashMap(); try { - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); } catch (IllegalArgumentException e) { fail("Failed to create queues with 0 capacity: " + e); @@ -678,7 +679,7 @@ public void testOffSwitchScheduling() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -754,8 +755,8 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception { //B3 Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); // Setup some nodes @@ -850,12 +851,12 @@ public void testQueueAcl() throws Exception { Map queues = new HashMap(); CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); YarnAuthorizationProvider authorizer = YarnAuthorizationProvider.getInstance(conf); - CapacityScheduler.setQueueAcls(authorizer, queues); + CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues); UserGroupInformation user = UserGroupInformation.getCurrentUser(); // Setup queue configs diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index f6caa50a315..3e05456a67e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -134,7 +134,7 @@ private void setup(CapacitySchedulerConfiguration csConf, when(csContext.getClusterResource()).thenReturn( Resources.createResource(100 * 16 * GB, 100 * 12)); when(csContext.getNonPartitionedQueueComparator()).thenReturn( - CapacityScheduler.nonPartitionedQueueComparator); + CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); when(csContext.getRMContext()).thenReturn(rmContext); @@ -144,7 +144,7 @@ private void setup(CapacitySchedulerConfiguration csConf, when(csContext.getContainerTokenSecretManager()).thenReturn( containerTokenSecretManager); - root = CapacityScheduler.parseQueue(csContext, csConf, null, + root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); ResourceUsage queueResUsage = root.getQueueResourceUsage(); @@ -1180,8 +1180,8 @@ public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, csConf.setBoolean( CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); Map newQueues = new HashMap(); - CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, newQueues, queues, + CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues, TestUtils.spyHook); queues = newQueues; root.reinitialize(newRoot, cs.getClusterResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index e34ee3467aa..b982faba6c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -141,7 +141,7 @@ public Resource answer(InvocationOnMock invocation) throws Throwable { /** * Hook to spy on queues. */ - static class SpyHook extends CapacityScheduler.QueueHook { + static class SpyHook extends CapacitySchedulerQueueManager.QueueHook { @Override public CSQueue hook(CSQueue queue) { return spy(queue);