YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9)
(cherry picked from commit 69fb70c31a
)
This commit is contained in:
parent
39b74b6f9e
commit
8ffe86f780
|
@ -0,0 +1,75 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Context of the Queues in Scheduler.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public interface SchedulerQueueManager<T extends Queue,
|
||||||
|
E extends ReservationSchedulerConfiguration> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the root queue.
|
||||||
|
* @return root queue
|
||||||
|
*/
|
||||||
|
T getRootQueue();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all the queues.
|
||||||
|
* @return a map contains all the queues as well as related queue names
|
||||||
|
*/
|
||||||
|
Map<String, T> getQueues();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the queue from the existing queue.
|
||||||
|
* @param queueName the queue name
|
||||||
|
*/
|
||||||
|
void removeQueue(String queueName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new queue to the existing queues.
|
||||||
|
* @param queueName the queue name
|
||||||
|
* @param queue the queue object
|
||||||
|
*/
|
||||||
|
void addQueue(String queueName, T queue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a queue matching the specified queue name.
|
||||||
|
* @param queueName the queue name
|
||||||
|
* @return a queue object
|
||||||
|
*/
|
||||||
|
T getQueue(String queueName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reinitialize the queues.
|
||||||
|
* @param newConf the configuration
|
||||||
|
* @throws IOException if fails to re-initialize queues
|
||||||
|
*/
|
||||||
|
void reinitializeQueues(E newConf) throws IOException;
|
||||||
|
}
|
|
@ -25,7 +25,6 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -68,8 +67,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
import org.apache.hadoop.yarn.security.Permission;
|
|
||||||
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||||
|
@ -157,9 +154,9 @@ public class CapacityScheduler extends
|
||||||
ResourceAllocationCommitter {
|
ResourceAllocationCommitter {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
||||||
private YarnAuthorizationProvider authorizer;
|
|
||||||
|
|
||||||
private CSQueue root;
|
private CapacitySchedulerQueueManager queueManager;
|
||||||
|
|
||||||
// timeout to join when we stop this service
|
// timeout to join when we stop this service
|
||||||
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
|
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
|
@ -169,22 +166,6 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private int offswitchPerHeartbeatLimit;
|
private int offswitchPerHeartbeatLimit;
|
||||||
|
|
||||||
static final Comparator<CSQueue> nonPartitionedQueueComparator =
|
|
||||||
new Comparator<CSQueue>() {
|
|
||||||
@Override
|
|
||||||
public int compare(CSQueue q1, CSQueue q2) {
|
|
||||||
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
|
|
||||||
return -1;
|
|
||||||
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return q1.getQueuePath().compareTo(q2.getQueuePath());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
static final PartitionedQueueComparator partitionedQueueComparator =
|
|
||||||
new PartitionedQueueComparator();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(Configuration conf) {
|
||||||
|
@ -237,8 +218,6 @@ public class CapacityScheduler extends
|
||||||
private CapacitySchedulerConfiguration conf;
|
private CapacitySchedulerConfiguration conf;
|
||||||
private Configuration yarnConf;
|
private Configuration yarnConf;
|
||||||
|
|
||||||
private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
|
|
||||||
|
|
||||||
private ResourceCalculator calculator;
|
private ResourceCalculator calculator;
|
||||||
private boolean usePortForNodeName;
|
private boolean usePortForNodeName;
|
||||||
|
|
||||||
|
@ -262,11 +241,11 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public QueueMetrics getRootQueueMetrics() {
|
public QueueMetrics getRootQueueMetrics() {
|
||||||
return root.getMetrics();
|
return getRootQueue().getMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
public CSQueue getRootQueue() {
|
public CSQueue getRootQueue() {
|
||||||
return root;
|
return queueManager.getRootQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -291,12 +270,12 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
|
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
|
||||||
return nonPartitionedQueueComparator;
|
return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PartitionedQueueComparator getPartitionedQueueComparator() {
|
public PartitionedQueueComparator getPartitionedQueueComparator() {
|
||||||
return partitionedQueueComparator;
|
return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -327,7 +306,10 @@ public class CapacityScheduler extends
|
||||||
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
||||||
this.applications = new ConcurrentHashMap<>();
|
this.applications = new ConcurrentHashMap<>();
|
||||||
this.labelManager = rmContext.getNodeLabelManager();
|
this.labelManager = rmContext.getNodeLabelManager();
|
||||||
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
|
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
|
||||||
|
this.labelManager);
|
||||||
|
this.queueManager.setCapacitySchedulerContext(this);
|
||||||
|
|
||||||
this.activitiesManager = new ActivitiesManager(rmContext);
|
this.activitiesManager = new ActivitiesManager(rmContext);
|
||||||
activitiesManager.init(conf);
|
activitiesManager.init(conf);
|
||||||
initializeQueues(this.conf);
|
initializeQueues(this.conf);
|
||||||
|
@ -555,13 +537,6 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class QueueHook {
|
|
||||||
public CSQueue hook(CSQueue queue) {
|
|
||||||
return queue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private static final QueueHook noop = new QueueHook();
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public UserGroupMappingPlacementRule
|
public UserGroupMappingPlacementRule
|
||||||
getUserGroupMappingPlacementRule() throws IOException {
|
getUserGroupMappingPlacementRule() throws IOException {
|
||||||
|
@ -579,7 +554,7 @@ public class CapacityScheduler extends
|
||||||
if (!mappingQueue.equals(
|
if (!mappingQueue.equals(
|
||||||
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
|
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
|
||||||
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
|
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
|
||||||
CSQueue queue = queues.get(mappingQueue);
|
CSQueue queue = getQueue(mappingQueue);
|
||||||
if (queue == null || !(queue instanceof LeafQueue)) {
|
if (queue == null || !(queue instanceof LeafQueue)) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"mapping contains invalid or non-leaf queue " + mappingQueue);
|
"mapping contains invalid or non-leaf queue " + mappingQueue);
|
||||||
|
@ -617,184 +592,29 @@ public class CapacityScheduler extends
|
||||||
private void initializeQueues(CapacitySchedulerConfiguration conf)
|
private void initializeQueues(CapacitySchedulerConfiguration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
root =
|
this.queueManager.initializeQueues(conf);
|
||||||
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
|
|
||||||
queues, queues, noop);
|
|
||||||
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
|
||||||
LOG.info("Initialized root queue " + root);
|
|
||||||
updatePlacementRules();
|
updatePlacementRules();
|
||||||
setQueueAcls(authorizer, queues);
|
|
||||||
|
|
||||||
// Notify Preemption Manager
|
// Notify Preemption Manager
|
||||||
preemptionManager.refreshQueues(null, root);
|
preemptionManager.refreshQueues(null, this.getRootQueue());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
@Lock(CapacityScheduler.class)
|
||||||
private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
|
private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// Parse new queues
|
this.queueManager.reinitializeQueues(newConf);
|
||||||
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
||||||
CSQueue newRoot =
|
|
||||||
parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
|
|
||||||
newQueues, queues, noop);
|
|
||||||
|
|
||||||
// Ensure all existing queues are still present
|
|
||||||
validateExistingQueues(queues, newQueues);
|
|
||||||
|
|
||||||
// Add new queues
|
|
||||||
addNewQueues(queues, newQueues);
|
|
||||||
|
|
||||||
// Re-configure queues
|
|
||||||
root.reinitialize(newRoot, getClusterResource());
|
|
||||||
updatePlacementRules();
|
updatePlacementRules();
|
||||||
|
|
||||||
// Re-calculate headroom for active applications
|
|
||||||
Resource clusterResource = getClusterResource();
|
|
||||||
root.updateClusterResource(clusterResource, new ResourceLimits(
|
|
||||||
clusterResource));
|
|
||||||
|
|
||||||
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
|
||||||
setQueueAcls(authorizer, queues);
|
|
||||||
|
|
||||||
// Notify Preemption Manager
|
// Notify Preemption Manager
|
||||||
preemptionManager.refreshQueues(null, root);
|
preemptionManager.refreshQueues(null, this.getRootQueue());
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
|
|
||||||
Map<String, CSQueue> queues) throws IOException {
|
|
||||||
List<Permission> permissions = new ArrayList<>();
|
|
||||||
for (CSQueue queue : queues.values()) {
|
|
||||||
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
|
|
||||||
permissions.add(
|
|
||||||
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
|
|
||||||
}
|
|
||||||
authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser());
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, Set<String>> getQueueToLabels() {
|
|
||||||
Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
|
|
||||||
for (CSQueue queue : queues.values()) {
|
|
||||||
queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
|
|
||||||
}
|
|
||||||
return queueToLabels;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ensure all existing queues are present. Queues cannot be deleted
|
|
||||||
* @param queues existing queues
|
|
||||||
* @param newQueues new queues
|
|
||||||
*/
|
|
||||||
@Lock(CapacityScheduler.class)
|
|
||||||
private void validateExistingQueues(
|
|
||||||
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
|
||||||
throws IOException {
|
|
||||||
// check that all static queues are included in the newQueues list
|
|
||||||
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
|
||||||
if (!(e.getValue() instanceof ReservationQueue)) {
|
|
||||||
String queueName = e.getKey();
|
|
||||||
CSQueue oldQueue = e.getValue();
|
|
||||||
CSQueue newQueue = newQueues.get(queueName);
|
|
||||||
if (null == newQueue) {
|
|
||||||
throw new IOException(queueName + " cannot be found during refresh!");
|
|
||||||
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
|
|
||||||
throw new IOException(queueName + " is moved from:"
|
|
||||||
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
|
|
||||||
+ " after refresh, which is not allowed.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the new queues (only) to our list of queues...
|
|
||||||
* ... be careful, do not overwrite existing queues.
|
|
||||||
* @param queues
|
|
||||||
* @param newQueues
|
|
||||||
*/
|
|
||||||
@Lock(CapacityScheduler.class)
|
|
||||||
private void addNewQueues(
|
|
||||||
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
|
||||||
{
|
|
||||||
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
|
|
||||||
String queueName = e.getKey();
|
|
||||||
CSQueue queue = e.getValue();
|
|
||||||
if (!queues.containsKey(queueName)) {
|
|
||||||
queues.put(queueName, queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Lock(CapacityScheduler.class)
|
|
||||||
static CSQueue parseQueue(
|
|
||||||
CapacitySchedulerContext csContext,
|
|
||||||
CapacitySchedulerConfiguration conf,
|
|
||||||
CSQueue parent, String queueName, Map<String, CSQueue> queues,
|
|
||||||
Map<String, CSQueue> oldQueues,
|
|
||||||
QueueHook hook) throws IOException {
|
|
||||||
CSQueue queue;
|
|
||||||
String fullQueueName =
|
|
||||||
(parent == null) ? queueName
|
|
||||||
: (parent.getQueuePath() + "." + queueName);
|
|
||||||
String[] childQueueNames =
|
|
||||||
conf.getQueues(fullQueueName);
|
|
||||||
boolean isReservableQueue = conf.isReservable(fullQueueName);
|
|
||||||
if (childQueueNames == null || childQueueNames.length == 0) {
|
|
||||||
if (null == parent) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Queue configuration missing child queue names for " + queueName);
|
|
||||||
}
|
|
||||||
// Check if the queue will be dynamically managed by the Reservation
|
|
||||||
// system
|
|
||||||
if (isReservableQueue) {
|
|
||||||
queue =
|
|
||||||
new PlanQueue(csContext, queueName, parent,
|
|
||||||
oldQueues.get(queueName));
|
|
||||||
} else {
|
|
||||||
queue =
|
|
||||||
new LeafQueue(csContext, queueName, parent,
|
|
||||||
oldQueues.get(queueName));
|
|
||||||
|
|
||||||
// Used only for unit tests
|
|
||||||
queue = hook.hook(queue);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (isReservableQueue) {
|
|
||||||
throw new IllegalStateException(
|
|
||||||
"Only Leaf Queues can be reservable for " + queueName);
|
|
||||||
}
|
|
||||||
ParentQueue parentQueue =
|
|
||||||
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
|
|
||||||
|
|
||||||
// Used only for unit tests
|
|
||||||
queue = hook.hook(parentQueue);
|
|
||||||
|
|
||||||
List<CSQueue> childQueues = new ArrayList<CSQueue>();
|
|
||||||
for (String childQueueName : childQueueNames) {
|
|
||||||
CSQueue childQueue =
|
|
||||||
parseQueue(csContext, conf, queue, childQueueName,
|
|
||||||
queues, oldQueues, hook);
|
|
||||||
childQueues.add(childQueue);
|
|
||||||
}
|
|
||||||
parentQueue.setChildQueues(childQueues);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queue instanceof LeafQueue && queues.containsKey(queueName)
|
|
||||||
&& queues.get(queueName) instanceof LeafQueue) {
|
|
||||||
throw new IOException("Two leaf queues were named " + queueName
|
|
||||||
+ ". Leaf queue names must be distinct");
|
|
||||||
}
|
|
||||||
queues.put(queueName, queue);
|
|
||||||
|
|
||||||
LOG.info("Initialized queue: " + queue);
|
|
||||||
return queue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public CSQueue getQueue(String queueName) {
|
public CSQueue getQueue(String queueName) {
|
||||||
if (queueName == null) {
|
if (queueName == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return queues.get(queueName);
|
return this.queueManager.getQueue(queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addApplicationOnRecovery(
|
private void addApplicationOnRecovery(
|
||||||
|
@ -1048,7 +868,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
// Inform the queue
|
// Inform the queue
|
||||||
String queueName = attempt.getQueue().getQueueName();
|
String queueName = attempt.getQueue().getQueueName();
|
||||||
CSQueue queue = queues.get(queueName);
|
CSQueue queue = this.getQueue(queueName);
|
||||||
if (!(queue instanceof LeafQueue)) {
|
if (!(queue instanceof LeafQueue)) {
|
||||||
LOG.error(
|
LOG.error(
|
||||||
"Cannot finish application " + "from non-leaf queue: " + queueName);
|
"Cannot finish application " + "from non-leaf queue: " + queueName);
|
||||||
|
@ -1175,7 +995,7 @@ public class CapacityScheduler extends
|
||||||
boolean includeChildQueues, boolean recursive)
|
boolean includeChildQueues, boolean recursive)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
CSQueue queue = null;
|
CSQueue queue = null;
|
||||||
queue = this.queues.get(queueName);
|
queue = this.getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
throw new IOException("Unknown queue: " + queueName);
|
throw new IOException("Unknown queue: " + queueName);
|
||||||
}
|
}
|
||||||
|
@ -1193,7 +1013,7 @@ public class CapacityScheduler extends
|
||||||
return new ArrayList<QueueUserACLInfo>();
|
return new ArrayList<QueueUserACLInfo>();
|
||||||
}
|
}
|
||||||
|
|
||||||
return root.getQueueUserAclInfo(user);
|
return getRootQueue().getQueueUserAclInfo(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1236,7 +1056,7 @@ public class CapacityScheduler extends
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
updateNodeResource(nm, resourceOption);
|
updateNodeResource(nm, resourceOption);
|
||||||
Resource clusterResource = getClusterResource();
|
Resource clusterResource = getClusterResource();
|
||||||
root.updateClusterResource(clusterResource,
|
getRootQueue().updateClusterResource(clusterResource,
|
||||||
new ResourceLimits(clusterResource));
|
new ResourceLimits(clusterResource));
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -1472,8 +1292,8 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
private CSAssignment allocateOrReserveNewContainers(
|
private CSAssignment allocateOrReserveNewContainers(
|
||||||
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
|
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
|
||||||
CSAssignment assignment = root.assignContainers(getClusterResource(), ps,
|
CSAssignment assignment = getRootQueue().assignContainers(
|
||||||
new ResourceLimits(labelManager
|
getClusterResource(), ps, new ResourceLimits(labelManager
|
||||||
.getResourceByLabel(ps.getPartition(), getClusterResource())),
|
.getResourceByLabel(ps.getPartition(), getClusterResource())),
|
||||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||||
|
|
||||||
|
@ -1507,7 +1327,7 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to use NON_EXCLUSIVE
|
// Try to use NON_EXCLUSIVE
|
||||||
assignment = root.assignContainers(getClusterResource(), ps,
|
assignment = getRootQueue().assignContainers(getClusterResource(), ps,
|
||||||
// TODO, now we only consider limits for parent for non-labeled
|
// TODO, now we only consider limits for parent for non-labeled
|
||||||
// resources, should consider labeled resources as well.
|
// resources, should consider labeled resources as well.
|
||||||
new ResourceLimits(labelManager
|
new ResourceLimits(labelManager
|
||||||
|
@ -1527,8 +1347,8 @@ public class CapacityScheduler extends
|
||||||
PlacementSet<FiCaSchedulerNode> ps) {
|
PlacementSet<FiCaSchedulerNode> ps) {
|
||||||
// When this time look at multiple nodes, try schedule if the
|
// When this time look at multiple nodes, try schedule if the
|
||||||
// partition has any available resource or killable resource
|
// partition has any available resource or killable resource
|
||||||
if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f
|
if (getRootQueue().getQueueCapacities().getUsedCapacity(
|
||||||
&& preemptionManager.getKillableResource(
|
ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
|
||||||
CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
|
CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
|
||||||
.none()) {
|
.none()) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -1711,7 +1531,7 @@ public class CapacityScheduler extends
|
||||||
updateLabelsOnNode(id, labels);
|
updateLabelsOnNode(id, labels);
|
||||||
}
|
}
|
||||||
Resource clusterResource = getClusterResource();
|
Resource clusterResource = getClusterResource();
|
||||||
root.updateClusterResource(clusterResource,
|
getRootQueue().updateClusterResource(clusterResource,
|
||||||
new ResourceLimits(clusterResource));
|
new ResourceLimits(clusterResource));
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -1732,7 +1552,7 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
Resource clusterResource = getClusterResource();
|
Resource clusterResource = getClusterResource();
|
||||||
root.updateClusterResource(clusterResource,
|
getRootQueue().updateClusterResource(clusterResource,
|
||||||
new ResourceLimits(clusterResource));
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
LOG.info(
|
LOG.info(
|
||||||
|
@ -1783,7 +1603,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
nodeTracker.removeNode(nodeId);
|
nodeTracker.removeNode(nodeId);
|
||||||
Resource clusterResource = getClusterResource();
|
Resource clusterResource = getClusterResource();
|
||||||
root.updateClusterResource(clusterResource,
|
getRootQueue().updateClusterResource(clusterResource,
|
||||||
new ResourceLimits(clusterResource));
|
new ResourceLimits(clusterResource));
|
||||||
int numNodes = nodeTracker.nodeCount();
|
int numNodes = nodeTracker.nodeCount();
|
||||||
|
|
||||||
|
@ -2021,7 +1841,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
|
||||||
CSQueue queue = queues.get(queueName);
|
CSQueue queue = getQueue(queueName);
|
||||||
if (queue == null) {
|
if (queue == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -2031,7 +1851,8 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isSystemAppsLimitReached() {
|
public boolean isSystemAppsLimitReached() {
|
||||||
if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
|
if (getRootQueue().getNumApplications() < conf
|
||||||
|
.getMaximumSystemApplications()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -2132,7 +1953,7 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
|
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
|
||||||
this.queues.remove(queueName);
|
this.queueManager.removeQueue(queueName);
|
||||||
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
|
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -2161,7 +1982,7 @@ public class CapacityScheduler extends
|
||||||
PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
|
PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
|
||||||
String queuename = newQueue.getQueueName();
|
String queuename = newQueue.getQueueName();
|
||||||
parentPlan.addChildQueue(newQueue);
|
parentPlan.addChildQueue(newQueue);
|
||||||
this.queues.put(queuename, newQueue);
|
this.queueManager.addQueue(queuename, newQueue);
|
||||||
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
|
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -2173,7 +1994,7 @@ public class CapacityScheduler extends
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
LeafQueue queue = getAndCheckLeafQueue(inQueue);
|
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
|
||||||
ParentQueue parent = (ParentQueue) queue.getParent();
|
ParentQueue parent = (ParentQueue) queue.getParent();
|
||||||
|
|
||||||
if (!(queue instanceof ReservationQueue)) {
|
if (!(queue instanceof ReservationQueue)) {
|
||||||
|
@ -2225,9 +2046,10 @@ public class CapacityScheduler extends
|
||||||
FiCaSchedulerApp app = getApplicationAttempt(
|
FiCaSchedulerApp app = getApplicationAttempt(
|
||||||
ApplicationAttemptId.newInstance(appId, 0));
|
ApplicationAttemptId.newInstance(appId, 0));
|
||||||
String sourceQueueName = app.getQueue().getQueueName();
|
String sourceQueueName = app.getQueue().getQueueName();
|
||||||
LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
|
LeafQueue source = this.queueManager.getAndCheckLeafQueue(
|
||||||
|
sourceQueueName);
|
||||||
String destQueueName = handleMoveToPlanQueue(targetQueueName);
|
String destQueueName = handleMoveToPlanQueue(targetQueueName);
|
||||||
LeafQueue dest = getAndCheckLeafQueue(destQueueName);
|
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
|
||||||
// Validation check - ACLs, submission limits for user & queue
|
// Validation check - ACLs, submission limits for user & queue
|
||||||
String user = app.getUser();
|
String user = app.getUser();
|
||||||
checkQueuePartition(app, dest);
|
checkQueuePartition(app, dest);
|
||||||
|
@ -2291,27 +2113,6 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Check that the String provided in input is the name of an existing,
|
|
||||||
* LeafQueue, if successful returns the queue.
|
|
||||||
*
|
|
||||||
* @param queue
|
|
||||||
* @return the LeafQueue
|
|
||||||
* @throws YarnException
|
|
||||||
*/
|
|
||||||
private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
|
|
||||||
CSQueue ret = this.getQueue(queue);
|
|
||||||
if (ret == null) {
|
|
||||||
throw new YarnException("The specified Queue: " + queue
|
|
||||||
+ " doesn't exist");
|
|
||||||
}
|
|
||||||
if (!(ret instanceof LeafQueue)) {
|
|
||||||
throw new YarnException("The specified Queue: " + queue
|
|
||||||
+ " is not a Leaf Queue. Move is supported only for Leaf Queues.");
|
|
||||||
}
|
|
||||||
return (LeafQueue) ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** {@inheritDoc} */
|
/** {@inheritDoc} */
|
||||||
@Override
|
@Override
|
||||||
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
|
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
|
||||||
|
@ -2348,7 +2149,7 @@ public class CapacityScheduler extends
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getPlanQueues() {
|
public Set<String> getPlanQueues() {
|
||||||
Set<String> ret = new HashSet<String>();
|
Set<String> ret = new HashSet<String>();
|
||||||
for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
|
for (Map.Entry<String, CSQueue> l : queueManager.getQueues().entrySet()) {
|
||||||
if (l.getValue() instanceof PlanQueue) {
|
if (l.getValue() instanceof PlanQueue) {
|
||||||
ret.add(l.getKey());
|
ret.add(l.getKey());
|
||||||
}
|
}
|
||||||
|
@ -2368,7 +2169,8 @@ public class CapacityScheduler extends
|
||||||
if (null == priorityFromContext) {
|
if (null == priorityFromContext) {
|
||||||
// Get the default priority for the Queue. If Queue is non-existent, then
|
// Get the default priority for the Queue. If Queue is non-existent, then
|
||||||
// use default priority
|
// use default priority
|
||||||
priorityFromContext = getDefaultPriorityForQueue(queueName);
|
priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
|
||||||
|
queueName);
|
||||||
|
|
||||||
LOG.info("Application '" + applicationId
|
LOG.info("Application '" + applicationId
|
||||||
+ "' is submitted without priority "
|
+ "' is submitted without priority "
|
||||||
|
@ -2392,18 +2194,6 @@ public class CapacityScheduler extends
|
||||||
return appPriority;
|
return appPriority;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Priority getDefaultPriorityForQueue(String queueName) {
|
|
||||||
Queue queue = getQueue(queueName);
|
|
||||||
if (null == queue || null == queue.getDefaultApplicationPriority()) {
|
|
||||||
// Return with default application priority
|
|
||||||
return Priority.newInstance(CapacitySchedulerConfiguration
|
|
||||||
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Priority.newInstance(queue.getDefaultApplicationPriority()
|
|
||||||
.getPriority());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Priority updateApplicationPriority(Priority newPriority,
|
public Priority updateApplicationPriority(Priority newPriority,
|
||||||
ApplicationId applicationId, SettableFuture<Object> future)
|
ApplicationId applicationId, SettableFuture<Object> future)
|
||||||
|
@ -2457,7 +2247,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResourceUsage getClusterResourceUsage() {
|
public ResourceUsage getClusterResourceUsage() {
|
||||||
return root.getQueueResourceUsage();
|
return getRootQueue().getQueueResourceUsage();
|
||||||
}
|
}
|
||||||
|
|
||||||
private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> getSchedulerContainer(
|
private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> getSchedulerContainer(
|
||||||
|
|
|
@ -0,0 +1,361 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.Permission;
|
||||||
|
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Context of the Queues in Capacity Scheduler.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@Unstable
|
||||||
|
public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
|
CSQueue, CapacitySchedulerConfiguration>{
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(
|
||||||
|
CapacitySchedulerQueueManager.class);
|
||||||
|
|
||||||
|
static final Comparator<CSQueue> NON_PARTITIONED_QUEUE_COMPARATOR =
|
||||||
|
new Comparator<CSQueue>() {
|
||||||
|
@Override
|
||||||
|
public int compare(CSQueue q1, CSQueue q2) {
|
||||||
|
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
|
||||||
|
return -1;
|
||||||
|
} else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return q1.getQueuePath().compareTo(q2.getQueuePath());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
|
||||||
|
new PartitionedQueueComparator();
|
||||||
|
|
||||||
|
static class QueueHook {
|
||||||
|
public CSQueue hook(CSQueue queue) {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final QueueHook NOOP = new QueueHook();
|
||||||
|
private CapacitySchedulerContext csContext;
|
||||||
|
private final YarnAuthorizationProvider authorizer;
|
||||||
|
private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
|
||||||
|
private CSQueue root;
|
||||||
|
private final RMNodeLabelsManager labelManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct the service.
|
||||||
|
* @param conf the configuration
|
||||||
|
* @param labelManager the labelManager
|
||||||
|
*/
|
||||||
|
public CapacitySchedulerQueueManager(Configuration conf,
|
||||||
|
RMNodeLabelsManager labelManager) {
|
||||||
|
this.authorizer = YarnAuthorizationProvider.getInstance(conf);
|
||||||
|
this.labelManager = labelManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CSQueue getRootQueue() {
|
||||||
|
return this.root;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, CSQueue> getQueues() {
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeQueue(String queueName) {
|
||||||
|
this.queues.remove(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addQueue(String queueName, CSQueue queue) {
|
||||||
|
this.queues.put(queueName, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CSQueue getQueue(String queueName) {
|
||||||
|
return queues.get(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the CapacitySchedulerContext.
|
||||||
|
* @param capacitySchedulerContext the CapacitySchedulerContext
|
||||||
|
*/
|
||||||
|
public void setCapacitySchedulerContext(
|
||||||
|
CapacitySchedulerContext capacitySchedulerContext) {
|
||||||
|
this.csContext = capacitySchedulerContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialized the queues.
|
||||||
|
* @param conf the CapacitySchedulerConfiguration
|
||||||
|
* @throws IOException if fails to initialize queues
|
||||||
|
*/
|
||||||
|
public void initializeQueues(CapacitySchedulerConfiguration conf)
|
||||||
|
throws IOException {
|
||||||
|
root = parseQueue(this.csContext, conf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
|
||||||
|
setQueueAcls(authorizer, queues);
|
||||||
|
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
||||||
|
LOG.info("Initialized root queue " + root);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
|
||||||
|
throws IOException {
|
||||||
|
// Parse new queues
|
||||||
|
Map<String, CSQueue> newQueues = new HashMap<>();
|
||||||
|
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
|
||||||
|
|
||||||
|
// Ensure all existing queues are still present
|
||||||
|
validateExistingQueues(queues, newQueues);
|
||||||
|
|
||||||
|
// Add new queues
|
||||||
|
addNewQueues(queues, newQueues);
|
||||||
|
|
||||||
|
// Re-configure queues
|
||||||
|
root.reinitialize(newRoot, this.csContext.getClusterResource());
|
||||||
|
|
||||||
|
setQueueAcls(authorizer, queues);
|
||||||
|
|
||||||
|
// Re-calculate headroom for active applications
|
||||||
|
Resource clusterResource = this.csContext.getClusterResource();
|
||||||
|
root.updateClusterResource(clusterResource, new ResourceLimits(
|
||||||
|
clusterResource));
|
||||||
|
|
||||||
|
labelManager.reinitializeQueueLabels(getQueueToLabels());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse the queue from the configuration.
|
||||||
|
* @param csContext the CapacitySchedulerContext
|
||||||
|
* @param conf the CapacitySchedulerConfiguration
|
||||||
|
* @param parent the parent queue
|
||||||
|
* @param queueName the queue name
|
||||||
|
* @param queues all the queues
|
||||||
|
* @param oldQueues the old queues
|
||||||
|
* @param hook the queue hook
|
||||||
|
* @return the CSQueue
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
static CSQueue parseQueue(
|
||||||
|
CapacitySchedulerContext csContext,
|
||||||
|
CapacitySchedulerConfiguration conf,
|
||||||
|
CSQueue parent, String queueName, Map<String, CSQueue> queues,
|
||||||
|
Map<String, CSQueue> oldQueues,
|
||||||
|
QueueHook hook) throws IOException {
|
||||||
|
CSQueue queue;
|
||||||
|
String fullQueueName =
|
||||||
|
(parent == null) ? queueName
|
||||||
|
: (parent.getQueuePath() + "." + queueName);
|
||||||
|
String[] childQueueNames = conf.getQueues(fullQueueName);
|
||||||
|
boolean isReservableQueue = conf.isReservable(fullQueueName);
|
||||||
|
if (childQueueNames == null || childQueueNames.length == 0) {
|
||||||
|
if (null == parent) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Queue configuration missing child queue names for " + queueName);
|
||||||
|
}
|
||||||
|
// Check if the queue will be dynamically managed by the Reservation
|
||||||
|
// system
|
||||||
|
if (isReservableQueue) {
|
||||||
|
queue =
|
||||||
|
new PlanQueue(csContext, queueName, parent,
|
||||||
|
oldQueues.get(queueName));
|
||||||
|
} else {
|
||||||
|
queue =
|
||||||
|
new LeafQueue(csContext, queueName, parent,
|
||||||
|
oldQueues.get(queueName));
|
||||||
|
|
||||||
|
// Used only for unit tests
|
||||||
|
queue = hook.hook(queue);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (isReservableQueue) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"Only Leaf Queues can be reservable for " + queueName);
|
||||||
|
}
|
||||||
|
ParentQueue parentQueue =
|
||||||
|
new ParentQueue(csContext, queueName, parent,
|
||||||
|
oldQueues.get(queueName));
|
||||||
|
|
||||||
|
// Used only for unit tests
|
||||||
|
queue = hook.hook(parentQueue);
|
||||||
|
|
||||||
|
List<CSQueue> childQueues = new ArrayList<>();
|
||||||
|
for (String childQueueName : childQueueNames) {
|
||||||
|
CSQueue childQueue =
|
||||||
|
parseQueue(csContext, conf, queue, childQueueName,
|
||||||
|
queues, oldQueues, hook);
|
||||||
|
childQueues.add(childQueue);
|
||||||
|
}
|
||||||
|
parentQueue.setChildQueues(childQueues);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queue instanceof LeafQueue && queues.containsKey(queueName)
|
||||||
|
&& queues.get(queueName) instanceof LeafQueue) {
|
||||||
|
throw new IOException("Two leaf queues were named " + queueName
|
||||||
|
+ ". Leaf queue names must be distinct");
|
||||||
|
}
|
||||||
|
queues.put(queueName, queue);
|
||||||
|
|
||||||
|
LOG.info("Initialized queue: " + queue);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure all existing queues are present. Queues cannot be deleted
|
||||||
|
* @param queues existing queues
|
||||||
|
* @param newQueues new queues
|
||||||
|
*/
|
||||||
|
private void validateExistingQueues(
|
||||||
|
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
||||||
|
throws IOException {
|
||||||
|
// check that all static queues are included in the newQueues list
|
||||||
|
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
||||||
|
if (!(e.getValue() instanceof ReservationQueue)) {
|
||||||
|
String queueName = e.getKey();
|
||||||
|
CSQueue oldQueue = e.getValue();
|
||||||
|
CSQueue newQueue = newQueues.get(queueName);
|
||||||
|
if (null == newQueue) {
|
||||||
|
throw new IOException(queueName + " cannot be found during refresh!");
|
||||||
|
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
|
||||||
|
throw new IOException(queueName + " is moved from:"
|
||||||
|
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
|
||||||
|
+ " after refresh, which is not allowed.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the new queues (only) to our list of queues...
|
||||||
|
* ... be careful, do not overwrite existing queues.
|
||||||
|
* @param queues the existing queues
|
||||||
|
* @param newQueues the new queues
|
||||||
|
*/
|
||||||
|
private void addNewQueues(
|
||||||
|
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
|
||||||
|
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
|
||||||
|
String queueName = e.getKey();
|
||||||
|
CSQueue queue = e.getValue();
|
||||||
|
if (!queues.containsKey(queueName)) {
|
||||||
|
queues.put(queueName, queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
/**
|
||||||
|
* Set the acls for the queues.
|
||||||
|
* @param authorizer the yarnAuthorizationProvider
|
||||||
|
* @param queues the queues
|
||||||
|
* @throws IOException if fails to set queue acls
|
||||||
|
*/
|
||||||
|
public static void setQueueAcls(YarnAuthorizationProvider authorizer,
|
||||||
|
Map<String, CSQueue> queues) throws IOException {
|
||||||
|
List<Permission> permissions = new ArrayList<>();
|
||||||
|
for (CSQueue queue : queues.values()) {
|
||||||
|
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
|
||||||
|
permissions.add(
|
||||||
|
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
|
||||||
|
}
|
||||||
|
authorizer.setPermission(permissions,
|
||||||
|
UserGroupInformation.getCurrentUser());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check that the String provided in input is the name of an existing,
|
||||||
|
* LeafQueue, if successful returns the queue.
|
||||||
|
*
|
||||||
|
* @param queue the queue name
|
||||||
|
* @return the LeafQueue
|
||||||
|
* @throws YarnException if the queue does not exist or the queue
|
||||||
|
* is not the type of LeafQueue.
|
||||||
|
*/
|
||||||
|
public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
|
||||||
|
CSQueue ret = this.getQueue(queue);
|
||||||
|
if (ret == null) {
|
||||||
|
throw new YarnException("The specified Queue: " + queue
|
||||||
|
+ " doesn't exist");
|
||||||
|
}
|
||||||
|
if (!(ret instanceof LeafQueue)) {
|
||||||
|
throw new YarnException("The specified Queue: " + queue
|
||||||
|
+ " is not a Leaf Queue.");
|
||||||
|
}
|
||||||
|
return (LeafQueue) ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the default priority of the queue.
|
||||||
|
* @param queueName the queue name
|
||||||
|
* @return the default priority of the queue
|
||||||
|
*/
|
||||||
|
public Priority getDefaultPriorityForQueue(String queueName) {
|
||||||
|
Queue queue = getQueue(queueName);
|
||||||
|
if (null == queue || null == queue.getDefaultApplicationPriority()) {
|
||||||
|
// Return with default application priority
|
||||||
|
return Priority.newInstance(CapacitySchedulerConfiguration
|
||||||
|
.DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
|
||||||
|
}
|
||||||
|
return Priority.newInstance(queue.getDefaultApplicationPriority()
|
||||||
|
.getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a map of queueToLabels.
|
||||||
|
* @return the map of queueToLabels
|
||||||
|
*/
|
||||||
|
private Map<String, Set<String>> getQueueToLabels() {
|
||||||
|
Map<String, Set<String>> queueToLabels = new HashMap<>();
|
||||||
|
for (CSQueue queue : getQueues().values()) {
|
||||||
|
queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
|
||||||
|
}
|
||||||
|
return queueToLabels;
|
||||||
|
}
|
||||||
|
}
|
|
@ -111,7 +111,8 @@ public class TestApplicationLimits {
|
||||||
when(csContext.getClusterResource()).
|
when(csContext.getClusterResource()).
|
||||||
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
|
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).
|
when(csContext.getResourceCalculator()).
|
||||||
thenReturn(resourceCalculator);
|
thenReturn(resourceCalculator);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
|
@ -123,9 +124,9 @@ public class TestApplicationLimits {
|
||||||
containerTokenSecretManager);
|
containerTokenSecretManager);
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root = CapacitySchedulerQueueManager
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
.parseQueue(csContext, csConf, null, "root",
|
||||||
queues, queues,
|
queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
|
||||||
|
@ -276,7 +277,8 @@ public class TestApplicationLimits {
|
||||||
when(csContext.getMaximumResourceCapability()).
|
when(csContext.getMaximumResourceCapability()).
|
||||||
thenReturn(Resources.createResource(16*GB, 16));
|
thenReturn(Resources.createResource(16*GB, 16));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
|
@ -288,8 +290,8 @@ public class TestApplicationLimits {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
queues, queues, TestUtils.spyHook);
|
"root", queues, queues, TestUtils.spyHook);
|
||||||
|
|
||||||
LeafQueue queue = (LeafQueue)queues.get(A);
|
LeafQueue queue = (LeafQueue)queues.get(A);
|
||||||
|
|
||||||
|
@ -356,9 +358,9 @@ public class TestApplicationLimits {
|
||||||
+ ".maximum-am-resource-percent", 0.5f);
|
+ ".maximum-am-resource-percent", 0.5f);
|
||||||
// Re-create queues to get new configs.
|
// Re-create queues to get new configs.
|
||||||
queues = new HashMap<String, CSQueue>();
|
queues = new HashMap<String, CSQueue>();
|
||||||
root =
|
root = CapacitySchedulerQueueManager.parseQueue(
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
csContext, csConf, null, "root",
|
||||||
queues, queues, TestUtils.spyHook);
|
queues, queues, TestUtils.spyHook);
|
||||||
clusterResource = Resources.createResource(100 * 16 * GB);
|
clusterResource = Resources.createResource(100 * 16 * GB);
|
||||||
|
|
||||||
queue = (LeafQueue)queues.get(A);
|
queue = (LeafQueue)queues.get(A);
|
||||||
|
@ -378,9 +380,9 @@ public class TestApplicationLimits {
|
||||||
9999);
|
9999);
|
||||||
// Re-create queues to get new configs.
|
// Re-create queues to get new configs.
|
||||||
queues = new HashMap<String, CSQueue>();
|
queues = new HashMap<String, CSQueue>();
|
||||||
root =
|
root = CapacitySchedulerQueueManager.parseQueue(
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
csContext, csConf, null, "root",
|
||||||
queues, queues, TestUtils.spyHook);
|
queues, queues, TestUtils.spyHook);
|
||||||
|
|
||||||
queue = (LeafQueue)queues.get(A);
|
queue = (LeafQueue)queues.get(A);
|
||||||
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
|
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
|
||||||
|
@ -580,7 +582,8 @@ public class TestApplicationLimits {
|
||||||
when(csContext.getMaximumResourceCapability()).
|
when(csContext.getMaximumResourceCapability()).
|
||||||
thenReturn(Resources.createResource(16*GB));
|
thenReturn(Resources.createResource(16*GB));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
|
|
||||||
|
@ -589,8 +592,8 @@ public class TestApplicationLimits {
|
||||||
when(csContext.getClusterResource()).thenReturn(clusterResource);
|
when(csContext.getClusterResource()).thenReturn(clusterResource);
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
|
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
|
||||||
"root", queues, queues, TestUtils.spyHook);
|
csConf, null, "root", queues, queues, TestUtils.spyHook);
|
||||||
|
|
||||||
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
|
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
|
||||||
when(csContext.getClusterResourceUsage())
|
when(csContext.getClusterResourceUsage())
|
||||||
|
|
|
@ -595,7 +595,8 @@ public class TestApplicationLimitsByPartition {
|
||||||
when(csContext.getMaximumResourceCapability())
|
when(csContext.getMaximumResourceCapability())
|
||||||
.thenReturn(Resources.createResource(16 * GB));
|
.thenReturn(Resources.createResource(16 * GB));
|
||||||
when(csContext.getNonPartitionedQueueComparator())
|
when(csContext.getNonPartitionedQueueComparator())
|
||||||
.thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
.thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
||||||
RMContext rmContext = TestUtils.getMockRMContext();
|
RMContext rmContext = TestUtils.getMockRMContext();
|
||||||
RMContext spyRMContext = spy(rmContext);
|
RMContext spyRMContext = spy(rmContext);
|
||||||
|
@ -614,8 +615,8 @@ public class TestApplicationLimitsByPartition {
|
||||||
when(csContext.getClusterResource()).thenReturn(clusterResource);
|
when(csContext.getClusterResource()).thenReturn(clusterResource);
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
|
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
|
||||||
"root", queues, queues, TestUtils.spyHook);
|
csConf, null, "root", queues, queues, TestUtils.spyHook);
|
||||||
|
|
||||||
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
|
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
|
||||||
when(csContext.getClusterResourceUsage())
|
when(csContext.getClusterResourceUsage())
|
||||||
|
|
|
@ -95,11 +95,12 @@ public class TestChildQueueOrder {
|
||||||
when(csContext.getMaximumResourceCapability()).thenReturn(
|
when(csContext.getMaximumResourceCapability()).thenReturn(
|
||||||
Resources.createResource(16*GB, 32));
|
Resources.createResource(16*GB, 32));
|
||||||
when(csContext.getClusterResource()).
|
when(csContext.getClusterResource()).
|
||||||
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).
|
when(csContext.getResourceCalculator()).
|
||||||
thenReturn(resourceComparator);
|
thenReturn(resourceComparator);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
}
|
}
|
||||||
|
@ -222,7 +223,7 @@ public class TestChildQueueOrder {
|
||||||
setupSortedQueues(csConf);
|
setupSortedQueues(csConf);
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,8 @@ public class TestLeafQueue {
|
||||||
when(csContext.getClusterResource()).
|
when(csContext.getClusterResource()).
|
||||||
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).
|
when(csContext.getResourceCalculator()).
|
||||||
thenReturn(resourceCalculator);
|
thenReturn(resourceCalculator);
|
||||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
|
@ -188,7 +189,7 @@ public class TestLeafQueue {
|
||||||
containerTokenSecretManager);
|
containerTokenSecretManager);
|
||||||
|
|
||||||
root =
|
root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT,
|
CapacitySchedulerConfiguration.ROOT,
|
||||||
queues, queues,
|
queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
@ -2380,7 +2381,7 @@ public class TestLeafQueue {
|
||||||
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
|
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
|
||||||
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
||||||
CSQueue newRoot =
|
CSQueue newRoot =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT,
|
CapacitySchedulerConfiguration.ROOT,
|
||||||
newQueues, queues,
|
newQueues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
@ -2405,7 +2406,7 @@ public class TestLeafQueue {
|
||||||
.NODE_LOCALITY_DELAY, 60);
|
.NODE_LOCALITY_DELAY, 60);
|
||||||
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
||||||
CSQueue newRoot =
|
CSQueue newRoot =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT,
|
CapacitySchedulerConfiguration.ROOT,
|
||||||
newQueues, queues,
|
newQueues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
|
@ -97,10 +97,11 @@ public class TestParentQueue {
|
||||||
when(csContext.getClusterResource()).
|
when(csContext.getClusterResource()).
|
||||||
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).
|
when(csContext.getNonPartitionedQueueComparator()).
|
||||||
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
thenReturn(
|
||||||
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
when(csContext.getResourceCalculator()).
|
when(csContext.getResourceCalculator()).
|
||||||
thenReturn(resourceComparator);
|
thenReturn(resourceComparator);
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,7 +232,7 @@ public class TestParentQueue {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
@ -346,7 +347,7 @@ public class TestParentQueue {
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
boolean exceptionOccured = false;
|
boolean exceptionOccured = false;
|
||||||
try {
|
try {
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
} catch (IllegalArgumentException ie) {
|
} catch (IllegalArgumentException ie) {
|
||||||
|
@ -360,7 +361,7 @@ public class TestParentQueue {
|
||||||
exceptionOccured = false;
|
exceptionOccured = false;
|
||||||
queues.clear();
|
queues.clear();
|
||||||
try {
|
try {
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
} catch (IllegalArgumentException ie) {
|
} catch (IllegalArgumentException ie) {
|
||||||
|
@ -374,7 +375,7 @@ public class TestParentQueue {
|
||||||
exceptionOccured = false;
|
exceptionOccured = false;
|
||||||
queues.clear();
|
queues.clear();
|
||||||
try {
|
try {
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
} catch (IllegalArgumentException ie) {
|
} catch (IllegalArgumentException ie) {
|
||||||
|
@ -467,7 +468,7 @@ public class TestParentQueue {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
@ -623,8 +624,8 @@ public class TestParentQueue {
|
||||||
csConf.setCapacity(Q_B + "." + B3, 0);
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -640,8 +641,8 @@ public class TestParentQueue {
|
||||||
csConf.setCapacity(Q_A, 60);
|
csConf.setCapacity(Q_A, 60);
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -662,8 +663,8 @@ public class TestParentQueue {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
try {
|
try {
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
fail("Failed to create queues with 0 capacity: " + e);
|
fail("Failed to create queues with 0 capacity: " + e);
|
||||||
|
@ -678,7 +679,7 @@ public class TestParentQueue {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
@ -754,8 +755,8 @@ public class TestParentQueue {
|
||||||
//B3
|
//B3
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
|
|
||||||
// Setup some nodes
|
// Setup some nodes
|
||||||
|
@ -850,12 +851,12 @@ public class TestParentQueue {
|
||||||
|
|
||||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
CSQueue root =
|
CSQueue root =
|
||||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
YarnAuthorizationProvider authorizer =
|
YarnAuthorizationProvider authorizer =
|
||||||
YarnAuthorizationProvider.getInstance(conf);
|
YarnAuthorizationProvider.getInstance(conf);
|
||||||
CapacityScheduler.setQueueAcls(authorizer, queues);
|
CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues);
|
||||||
|
|
||||||
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
UserGroupInformation user = UserGroupInformation.getCurrentUser();
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
|
|
|
@ -134,7 +134,7 @@ public class TestReservations {
|
||||||
when(csContext.getClusterResource()).thenReturn(
|
when(csContext.getClusterResource()).thenReturn(
|
||||||
Resources.createResource(100 * 16 * GB, 100 * 12));
|
Resources.createResource(100 * 16 * GB, 100 * 12));
|
||||||
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
|
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
|
||||||
CapacityScheduler.nonPartitionedQueueComparator);
|
CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
|
||||||
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
||||||
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
when(csContext.getRMContext()).thenReturn(rmContext);
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
|
@ -144,7 +144,7 @@ public class TestReservations {
|
||||||
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
when(csContext.getContainerTokenSecretManager()).thenReturn(
|
||||||
containerTokenSecretManager);
|
containerTokenSecretManager);
|
||||||
|
|
||||||
root = CapacityScheduler.parseQueue(csContext, csConf, null,
|
root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
|
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
|
||||||
|
|
||||||
ResourceUsage queueResUsage = root.getQueueResourceUsage();
|
ResourceUsage queueResUsage = root.getQueueResourceUsage();
|
||||||
|
@ -1180,8 +1180,8 @@ public class TestReservations {
|
||||||
csConf.setBoolean(
|
csConf.setBoolean(
|
||||||
CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
|
CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
|
||||||
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
||||||
CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null,
|
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
|
||||||
CapacitySchedulerConfiguration.ROOT, newQueues, queues,
|
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
queues = newQueues;
|
queues = newQueues;
|
||||||
root.reinitialize(newRoot, cs.getClusterResource());
|
root.reinitialize(newRoot, cs.getClusterResource());
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class TestUtils {
|
||||||
/**
|
/**
|
||||||
* Hook to spy on queues.
|
* Hook to spy on queues.
|
||||||
*/
|
*/
|
||||||
static class SpyHook extends CapacityScheduler.QueueHook {
|
static class SpyHook extends CapacitySchedulerQueueManager.QueueHook {
|
||||||
@Override
|
@Override
|
||||||
public CSQueue hook(CSQueue queue) {
|
public CSQueue hook(CSQueue queue) {
|
||||||
return spy(queue);
|
return spy(queue);
|
||||||
|
|
Loading…
Reference in New Issue