YARN-10907. Minimize usages of AbstractCSQueue#csContext. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2021-12-13 21:57:46 +01:00
parent 898055e204
commit a5bcf4c792
38 changed files with 1010 additions and 736 deletions

View File

@ -95,7 +95,7 @@ protected void addReservationQueue(
PlanQueue planQueue = (PlanQueue)queue;
try {
ReservationQueue resQueue =
new ReservationQueue(cs, currResId, planQueue);
new ReservationQueue(cs.getQueueContext(), currResId, planQueue);
cs.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
@ -115,7 +115,7 @@ protected void createDefaultReservationQueue(
if (cs.getQueue(defReservationId) == null) {
try {
ReservationQueue defQueue =
new ReservationQueue(cs, defReservationId, planQueue);
new ReservationQueue(cs.getQueueContext(), defReservationId, planQueue);
cs.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(

View File

@ -35,23 +35,15 @@
* of AbstractManagedParentQueue
*/
public class AbstractAutoCreatedLeafQueue extends AbstractLeafQueue {
protected AbstractManagedParentQueue parent;
public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
String queueName, AbstractManagedParentQueue parent, CSQueue old)
throws IOException {
super(cs, queueName, parent, old);
this.parent = parent;
}
private static final Logger LOG = LoggerFactory.getLogger(
AbstractAutoCreatedLeafQueue.class);
public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration leafQueueConfigs, String queueName,
AbstractManagedParentQueue parent, CSQueue old) throws IOException {
super(cs, leafQueueConfigs, queueName, parent, old);
protected AbstractManagedParentQueue parent;
public AbstractAutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, AbstractManagedParentQueue parent, CSQueue old)
throws IOException {
super(queueContext, queueName, parent, old);
this.parent = parent;
}
@ -71,7 +63,7 @@ public void setEntitlement(QueueEntitlement entitlement)
@Override
protected Resource getMinimumAbsoluteResource(String queuePath,
String label) {
return super.getMinimumAbsoluteResource(csContext.getConfiguration()
return super.getMinimumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
@ -79,7 +71,7 @@ protected Resource getMinimumAbsoluteResource(String queuePath,
@Override
protected Resource getMaximumAbsoluteResource(String queuePath,
String label) {
return super.getMaximumAbsoluteResource(csContext.getConfiguration()
return super.getMaximumAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
@ -87,7 +79,7 @@ protected Resource getMaximumAbsoluteResource(String queuePath,
@Override
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return super.checkConfigTypeIsAbsoluteResource(csContext.getConfiguration()
return super.checkConfigTypeIsAbsoluteResource(queueContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
@ -122,7 +114,7 @@ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)
//update queue used capacity etc
CSQueueUtils.updateQueueStatistics(resourceCalculator,
csContext.getClusterResource(),
queueContext.getClusterResource(),
this, labelManager, nodeLabel);
} finally {
writeLock.unlock();

View File

@ -117,7 +117,7 @@ public enum CapacityConfigType {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
protected CapacitySchedulerQueueContext queueContext;
protected YarnAuthorizationProvider authorizer = null;
protected ActivitiesManager activitiesManager;
@ -131,33 +131,39 @@ public enum CapacityConfigType {
// is it a dynamic queue?
private boolean dynamicQueue = false;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old);
}
public AbstractCSQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration configuration, String queueName,
public AbstractCSQueue(CapacitySchedulerQueueContext queueContext, String queueName,
CSQueue parent, CSQueue old) {
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queuePath = createQueuePath(parent, queueName);
this.resourceCalculator = cs.getResourceCalculator();
this.activitiesManager = cs.getActivitiesManager();
this.queueContext = queueContext;
this.resourceCalculator = queueContext.getResourceCalculator();
this.activitiesManager = queueContext.getActivitiesManager();
this.labelManager = queueContext.getLabelManager();
// must be called after parent and queueName is set
CSQueueMetrics metrics = old != null ?
(CSQueueMetrics) old.getMetrics() :
CSQueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(), configuration);
usageTracker = new CSQueueUsageTracker(metrics);
this.csContext = cs;
this.queueAllocationSettings = new QueueAllocationSettings(csContext);
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
queueCapacities = new QueueCapacities(parent == null);
queueContext.getConfiguration().getEnableUserMetrics(), queueContext.getConfiguration());
this.usageTracker = new CSQueueUsageTracker(metrics);
this.queueCapacities = new QueueCapacities(parent == null);
this.queueAllocationSettings = new QueueAllocationSettings(queueContext.getMinimumAllocation());
this.queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
this.resourceTypes = new HashSet<>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
this.resourceTypes.add(type.toString().toLowerCase());
}
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
LOG.debug("Initialized {}: name={}, fullname={}", this.getClass().getSimpleName(),
queueName, getQueuePath());
}
private static QueuePath createQueuePath(CSQueue parent, String queueName) {
@ -167,11 +173,6 @@ private static QueuePath createQueuePath(CSQueue parent, String queueName) {
return new QueuePath(parent.getQueuePath(), queueName);
}
@VisibleForTesting
protected void setupConfigurableCapacities() {
setupConfigurableCapacities(csContext.getConfiguration());
}
protected void setupConfigurableCapacities(
CapacitySchedulerConfiguration configuration) {
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
@ -262,6 +263,10 @@ public PrivilegedEntity getPrivilegedEntity() {
return queueEntity;
}
public CapacitySchedulerQueueContext getQueueContext() {
return queueContext;
}
public Set<String> getAccessibleNodeLabels() {
return queueNodeLabelsSettings.getAccessibleNodeLabels();
}
@ -336,26 +341,24 @@ protected void setupQueueConfigs(Resource clusterResource,
// Collect and set the Node label configuration
this.queueNodeLabelsSettings = new QueueNodeLabelsSettings(configuration, parent,
getQueuePath(), csContext);
getQueuePath(), queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());
// Initialize the queue capacities
setupConfigurableCapacities(configuration);
updateAbsoluteCapacities();
updateCapacityConfigType();
// Fetch minimum/maximum resource limits for this queue if
// configured
this.resourceTypes = new HashSet<>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
resourceTypes.add(type.toString().toLowerCase());
}
updateConfigurableResourceLimits(clusterResource);
// Setup queue's maximumAllocation respecting the global
// and the queue settings
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
parent, csContext);
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.queueAllocationSettings.setupMaximumAllocation(configuration,
queueContext.getConfiguration(), getQueuePath(),
parent);
// Initialize the queue state based on previous state, configured state
// and its parent state
@ -369,7 +372,8 @@ protected void setupQueueConfigs(Resource clusterResource,
this.reservationsContinueLooking =
configuration.getReservationContinueLook();
this.configuredCapacityVectors = csContext.getConfiguration()
this.configuredCapacityVectors = queueContext.getConfiguration()
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
@ -378,7 +382,10 @@ protected void setupQueueConfigs(Resource clusterResource,
this, labelManager, null);
// Store preemption settings
this.preemptionSettings = new CSQueuePreemptionSettings(this, csContext, configuration);
// TODO remove the getConfiguration() param after the AQC configuration duplication
// removal is resolved
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration,
queueContext.getConfiguration());
this.priority = configuration.getQueuePriority(
getQueuePath());
@ -409,12 +416,13 @@ protected void setDynamicQueueProperties(
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
DOT));
Set<String> parentNodeLabels = csContext
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
Set<String> parentNodeLabels = queueContext.getQueueManager()
.getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(parentTemplate);
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
queueContext.getQueueManager()
.getConfiguredNodeLabelsForAllQueues()
.setLabelsByQueue(getQueuePath(), new HashSet<>(parentNodeLabels));
}
}
@ -436,20 +444,18 @@ private UserWeights getUserWeightsFromHierarchy(
}
protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
Resource minResource = csContext.getConfiguration()
return queueContext.getConfiguration()
.getMinimumResourceRequirement(label, queuePath, resourceTypes);
return minResource;
}
protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
Resource maxResource = csContext.getConfiguration()
return queueContext.getConfiguration()
.getMaximumResourceRequirement(label, queuePath, resourceTypes);
return maxResource;
}
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
String label) {
return csContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
return queueContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
queuePath, resourceTypes);
}
@ -743,7 +749,7 @@ protected void releaseResource(Resource clusterResource,
}
@Private
public boolean getReservationContinueLooking() {
public boolean isReservationsContinueLooking() {
return reservationsContinueLooking;
}
@ -764,7 +770,7 @@ public boolean getPreemptionDisabled() {
@Private
public boolean getIntraQueuePreemptionDisabled() {
return preemptionSettings.getIntraQueuePreemptionDisabled();
return preemptionSettings.isIntraQueuePreemptionDisabled();
}
@Private
@ -1026,12 +1032,12 @@ public Set<String> getNodeLabelsForQueue() {
}
public Resource getTotalKillableResource(String partition) {
return csContext.getPreemptionManager().getKillableResource(getQueuePath(),
return queueContext.getPreemptionManager().getKillableResource(getQueuePath(),
partition);
}
public Iterator<RMContainer> getKillableContainers(String partition) {
return csContext.getPreemptionManager().getKillableContainers(
return queueContext.getPreemptionManager().getKillableContainers(
getQueuePath(),
partition);
}
@ -1383,7 +1389,7 @@ public boolean isInactiveDynamicQueue() {
long idleDurationSeconds =
(Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
return isDynamicQueue() && isEligibleForAutoDeletion() &&
(idleDurationSeconds > this.csContext.getConfiguration().
(idleDurationSeconds > queueContext.getConfiguration().
getAutoExpiredDeletionTime());
}

View File

@ -44,9 +44,9 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
protected AutoCreatedLeafQueueConfig leafQueueTemplate;
protected AutoCreatedQueueManagementPolicy queueManagementPolicy = null;
public AbstractManagedParentQueue(CapacitySchedulerContext cs,
public AbstractManagedParentQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
super(queueContext, queueName, parent, old);
}
@Override
@ -55,7 +55,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
writeLock.lock();
try {
// Set new configs
setupQueueConfigs(clusterResource, csContext.getConfiguration());
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
} finally {
writeLock.unlock();
@ -121,8 +121,7 @@ public CSQueue removeChildQueue(String childQueueName)
CSQueue childQueue;
writeLock.lock();
try {
childQueue = this.csContext.getCapacitySchedulerQueueManager().getQueue(
childQueueName);
childQueue = queueContext.getQueueManager().getQueue(childQueueName);
if (childQueue != null) {
removeChildQueue(childQueue);
} else {
@ -176,14 +175,14 @@ protected CapacitySchedulerConfiguration initializeLeafQueueConfigs(String
CapacitySchedulerConfiguration leafQueueConfigs = new
CapacitySchedulerConfiguration(new Configuration(false), false);
Map<String, String> rtProps = csContext
Map<String, String> rtProps = queueContext
.getConfiguration().getConfigurationProperties()
.getPropertiesWithPrefix(YarnConfiguration.RESOURCE_TYPES + ".", true);
for (Map.Entry<String, String> entry : rtProps.entrySet()) {
leafQueueConfigs.set(entry.getKey(), entry.getValue());
}
Map<String, String> templateConfigs = csContext
Map<String, String> templateConfigs = queueContext
.getConfiguration().getConfigurationProperties()
.getPropertiesWithPrefix(configPrefix, true);

View File

@ -39,16 +39,11 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
private static final Logger LOG = LoggerFactory
.getLogger(AutoCreatedLeafQueue.class);
public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
public AutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext, String queueName,
ManagedParentQueue parent) throws IOException {
// TODO once YARN-10907 is merged the duplicated collection of
// leafQueueConfigs won't be necessary
super(cs, parent.getLeafQueueConfigs(queueName),
queueName,
parent, null);
super.setupQueueConfigs(cs.getClusterResource(), parent.getLeafQueueConfigs(queueName));
super(queueContext, queueName, parent, null);
super.setupQueueConfigs(queueContext.getClusterResource(), parent.getLeafQueueConfigs(queueName));
LOG.debug("Initialized AutoCreatedLeafQueue: name={}, fullname={}", queueName, getQueuePath());
updateCapacitiesToZero();
}
@ -74,8 +69,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
}
}
public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
leafQueueTemplate) throws SchedulerDynamicEditException, IOException {
public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig leafQueueTemplate) {
writeLock.lock();
try {
@ -105,7 +99,7 @@ public void mergeCapacities(QueueCapacities capacities) {
.getAbsoluteMaximumCapacity(nodeLabel));
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
csContext.getClusterResource());
queueContext.getClusterResource());
getQueueResourceQuotas().setEffectiveMinResource(nodeLabel,
Resources.multiply(resourceByLabel,
queueCapacities.getAbsoluteCapacity(nodeLabel)));
@ -133,12 +127,12 @@ protected void setDynamicQueueProperties(
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
CapacitySchedulerConfiguration
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
Set<String> parentNodeLabels = csContext
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
Set<String> parentNodeLabels = queueContext
.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(parentTemplate);
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.setLabelsByQueue(getQueuePath(),
new HashSet<>(parentNodeLabels));
}

View File

@ -27,17 +27,15 @@ public interface AutoCreatedQueueManagementPolicy {
/**
* Initialize policy
* @param schedulerContext Capacity Scheduler context
* @param parentQueue parent queue
*/
void init(CapacitySchedulerContext schedulerContext, ParentQueue
parentQueue) throws IOException;
void init(ParentQueue parentQueue) throws IOException;
/**
* Reinitialize policy state ( if required )
* @param schedulerContext Capacity Scheduler context
* @param parentQueue parent queue
*/
void reinitialize(CapacitySchedulerContext schedulerContext,
ParentQueue parentQueue) throws IOException;
void reinitialize(ParentQueue parentQueue) throws IOException;
/**
* Get initial template for the specified leaf queue

View File

@ -26,11 +26,13 @@ public class CSQueuePreemptionSettings {
public CSQueuePreemptionSettings(
CSQueue queue,
CapacitySchedulerContext csContext,
CapacitySchedulerConfiguration configuration) {
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(queue, csContext, configuration);
CapacitySchedulerConfiguration configuration,
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(queue, configuration,
originalSchedulerConfiguration);
this.intraQueuePreemptionDisabledInHierarchy =
isIntraQueueHierarchyPreemptionDisabled(queue, csContext, configuration);
isIntraQueueHierarchyPreemptionDisabled(queue, configuration,
originalSchedulerConfiguration);
}
/**
@ -40,14 +42,14 @@ public CSQueuePreemptionSettings(
* NOTE: Cross-queue preemptability is inherited from a queue's parent.
*
* @param q queue to check preemption state
* @param csContext
* @param configuration capacity scheduler config
* @return true if queue has cross-queue preemption disabled, false otherwise
*/
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
CapacitySchedulerContext csContext, CapacitySchedulerConfiguration configuration) {
CapacitySchedulerConfiguration configuration,
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
boolean systemWidePreemption =
csContext.getConfiguration()
originalSchedulerConfiguration
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
CSQueue parentQ = q.getParent();
@ -79,14 +81,14 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
* NOTE: Intra-queue preemptability is inherited from a queue's parent.
*
* @param q queue to check intra-queue preemption state
* @param csContext
* @param configuration capacity scheduler config
* @return true if queue has intra-queue preemption disabled, false otherwise
*/
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
CapacitySchedulerContext csContext, CapacitySchedulerConfiguration configuration) {
CapacitySchedulerConfiguration configuration,
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
boolean systemWideIntraQueuePreemption =
csContext.getConfiguration().getBoolean(
originalSchedulerConfiguration.getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
@ -109,7 +111,7 @@ private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
}
public boolean getIntraQueuePreemptionDisabled() {
public boolean isIntraQueuePreemptionDisabled() {
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
}

View File

@ -183,6 +183,8 @@ public class CapacityScheduler extends
private CapacitySchedulerQueueManager queueManager;
private CapacitySchedulerQueueContext queueContext;
private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr;
// timeout to join when we stop this service
@ -267,6 +269,11 @@ public CapacitySchedulerConfiguration getConfiguration() {
return conf;
}
@Override
public CapacitySchedulerQueueContext getQueueContext() {
return queueContext;
}
@Override
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.rmContext.getContainerTokenSecretManager();
@ -319,6 +326,7 @@ void initScheduler(Configuration configuration) throws
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
this.queueContext = new CapacitySchedulerQueueContext(this);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
@ -844,6 +852,7 @@ private void initializeQueues(CapacitySchedulerConfiguration conf)
@Lock(CapacityScheduler.class)
private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
queueContext.reinitialize();
this.queueManager.reinitializeQueues(newConf);
updatePlacementRules();

View File

@ -1658,6 +1658,12 @@ public Map<String, Set<String>> getConfiguredNodeLabelsByQueue() {
return labelsByQueue;
}
public Priority getClusterLevelApplicationMaxPriority() {
return Priority.newInstance(getInt(
YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
YarnConfiguration.DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY));
}
public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
Integer defaultPriority = getInt(getQueuePrefix(queue)
+ DEFAULT_APPLICATION_PRIORITY,

View File

@ -41,6 +41,8 @@
*/
public interface CapacitySchedulerContext {
CapacitySchedulerConfiguration getConfiguration();
CapacitySchedulerQueueContext getQueueContext();
Resource getMinimumResourceCapability();

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
* Class to store common queue related information, like instances
* to necessary manager classes or the global CapacityScheduler
* configuration.
*/
public class CapacitySchedulerQueueContext {
// Manager classes
private final CapacitySchedulerContext csContext;
private final CapacitySchedulerQueueManager queueManager;
private final RMNodeLabelsManager labelManager;
private final PreemptionManager preemptionManager;
private final ActivitiesManager activitiesManager;
private final ResourceCalculator resourceCalculator;
// CapacityScheduler configuration
private CapacitySchedulerConfiguration configuration;
private Resource minimumAllocation;
public CapacitySchedulerQueueContext(CapacitySchedulerContext csContext) {
this.csContext = csContext;
this.queueManager = csContext.getCapacitySchedulerQueueManager();
this.labelManager = csContext.getRMContext().getNodeLabelManager();
this.preemptionManager = csContext.getPreemptionManager();
this.activitiesManager = csContext.getActivitiesManager();
this.resourceCalculator = csContext.getResourceCalculator();
this.configuration = new CapacitySchedulerConfiguration(csContext.getConfiguration());
this.minimumAllocation = csContext.getMinimumResourceCapability();
}
public void reinitialize() {
// When csConfProvider.loadConfiguration is called, the useLocalConfigurationProvider is
// correctly set to load the config entries from the capacity-scheduler.xml.
// For this reason there is no need to reload from it again.
this.configuration = new CapacitySchedulerConfiguration(csContext.getConfiguration(), false);
this.minimumAllocation = csContext.getMinimumResourceCapability();
}
public CapacitySchedulerQueueManager getQueueManager() {
return queueManager;
}
public RMNodeLabelsManager getLabelManager() {
return labelManager;
}
public PreemptionManager getPreemptionManager() {
return preemptionManager;
}
public ActivitiesManager getActivitiesManager() {
return activitiesManager;
}
public ResourceCalculator getResourceCalculator() {
return resourceCalculator;
}
public CapacitySchedulerConfiguration getConfiguration() {
return configuration;
}
public Resource getMinimumAllocation() {
return minimumAllocation;
}
public Resource getClusterResource() {
return csContext.getClusterResource();
}
public ResourceUsage getClusterResourceUsage() {
return queueManager.getRootQueue().getQueueResourceUsage();
}
public SchedulerHealth getSchedulerHealth() {
return csContext.getSchedulerHealth();
}
public long getLastNodeUpdateTime() {
return csContext.getLastNodeUpdateTime();
}
public FiCaSchedulerNode getNode(NodeId nodeId) {
return csContext.getNode(nodeId);
}
public FiCaSchedulerApp getApplicationAttempt(
ApplicationAttemptId applicationAttemptId) {
return csContext.getApplicationAttempt(applicationAttemptId);
}
// TODO this is used in GuaranteedOrZeroCapacityOverTimePolicy, refactor the comparator there
public RMApp getRMApp(ApplicationId applicationId) {
return csContext.getRMContext().getRMApps().get(applicationId);
}
}

View File

@ -106,6 +106,11 @@ public CSQueue getRootQueue() {
return this.root;
}
@VisibleForTesting
protected void setRootQueue(CSQueue rootQueue) {
this.root = rootQueue;
}
@Override
public Map<String, CSQueue> getQueues() {
return queues.getFullNameQueues();
@ -167,7 +172,7 @@ public void setCapacitySchedulerContext(
public void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
configuredNodeLabels = new ConfiguredNodeLabels(conf);
root = parseQueue(this.csContext, conf, null,
root = parseQueue(this.csContext.getQueueContext(), conf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
setQueueAcls(authorizer, appPriorityACLManager, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
@ -183,7 +188,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
// Parse new queues
CSQueueStore newQueues = new CSQueueStore();
configuredNodeLabels = new ConfiguredNodeLabels(newConf);
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CSQueue newRoot = parseQueue(this.csContext.getQueueContext(), newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// When failing over, if using configuration store, don't validate queue
@ -215,7 +220,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
/**
* Parse the queue from the configuration.
* @param csContext the CapacitySchedulerContext
* @param queueContext the CapacitySchedulerQueueContext
* @param conf the CapacitySchedulerConfiguration
* @param parent the parent queue
* @param queueName the queue name
@ -226,7 +231,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
* @throws IOException
*/
static CSQueue parseQueue(
CapacitySchedulerContext csContext,
CapacitySchedulerQueueContext queueContext,
CapacitySchedulerConfiguration conf,
CSQueue parent, String queueName,
CSQueueStore newQueues,
@ -265,7 +270,7 @@ static CSQueue parseQueue(
// Check if the queue will be dynamically managed by the Reservation
// system
if (isReservableQueue) {
queue = new PlanQueue(csContext, queueName, parent,
queue = new PlanQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
//initializing the "internal" default queue, for SLS compatibility
@ -273,7 +278,7 @@ static CSQueue parseQueue(
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(csContext,
ReservationQueue resQueue = new ReservationQueue(queueContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
@ -285,11 +290,11 @@ static CSQueue parseQueue(
newQueues.add(resQueue);
} else if (isAutoCreateEnabled) {
queue = new ManagedParentQueue(csContext, queueName, parent,
queue = new ManagedParentQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
} else{
queue = new LeafQueue(csContext, queueName, parent,
queue = new LeafQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
// Used only for unit tests
queue = hook.hook(queue);
@ -302,10 +307,10 @@ static CSQueue parseQueue(
ParentQueue parentQueue;
if (isAutoCreateEnabled) {
parentQueue = new ManagedParentQueue(csContext, queueName, parent,
parentQueue = new ManagedParentQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
} else{
parentQueue = new ParentQueue(csContext, queueName, parent,
parentQueue = new ParentQueue(queueContext, queueName, parent,
oldQueues.get(fullQueueName));
}
@ -314,7 +319,7 @@ static CSQueue parseQueue(
List<CSQueue> childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName,
CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName,
newQueues, oldQueues, hook);
childQueues.add(childQueue);
}
@ -633,7 +638,7 @@ public List<String> determineMissingParents(
* for all queues.
* @return configured node labels
*/
public ConfiguredNodeLabels getConfiguredNodeLabels() {
public ConfiguredNodeLabels getConfiguredNodeLabelsForAllQueues() {
return configuredNodeLabels;
}
@ -676,7 +681,7 @@ private AbstractLeafQueue createLegacyAutoQueue(QueuePath queue)
(ManagedParentQueue) parentQueue;
AutoCreatedLeafQueue autoCreatedLeafQueue =
new AutoCreatedLeafQueue(
csContext, queue.getLeafName(), autoCreateEnabledParentQueue);
csContext.getQueueContext(), queue.getLeafName(), autoCreateEnabledParentQueue);
addLegacyDynamicQueue(autoCreatedLeafQueue);
return autoCreatedLeafQueue;

View File

@ -31,26 +31,16 @@ public class LeafQueue extends AbstractLeafQueue {
private static final Logger LOG =
LoggerFactory.getLogger(LeafQueue.class);
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
public LeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old, false);
this(queueContext, queueName, parent, old, false);
}
public LeafQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration configuration,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, configuration, queueName, parent, old, false);
}
public LeafQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration configuration,
public LeafQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
IOException {
super(cs, configuration, queueName, parent, old, isDynamic);
super(queueContext, queueName, parent, old, isDynamic);
setupQueueConfigs(cs.getClusterResource(), configuration);
LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
setupQueueConfigs(queueContext.getClusterResource(), queueContext.getConfiguration());
}
}

View File

@ -53,23 +53,18 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(
ManagedParentQueue.class);
public ManagedParentQueue(final CapacitySchedulerContext cs,
public ManagedParentQueue(final CapacitySchedulerQueueContext queueContext,
final String queueName, final CSQueue parent, final CSQueue old)
throws IOException {
super(cs, queueName, parent, old);
super(queueContext, queueName, parent, old);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
csContext.getConfiguration()
queueContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePath());
leafQueueTemplate = initializeLeafQueueConfigs().build();
LOG.info(
"Created Managed Parent Queue: [{}] with capacity: [{}]"
+ " with max capacity: [{}]",
queueName, super.getCapacity(), super.getMaximumCapacity());
initializeQueueManagementPolicy();
}
@ -82,7 +77,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
validate(newlyParsedQueue);
shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
csContext.getConfiguration()
queueContext.getConfiguration()
.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
getQueuePath());
@ -133,23 +128,23 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
private void initializeQueueManagementPolicy() throws IOException {
queueManagementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
queueContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
queueManagementPolicy.init(csContext, this);
queueManagementPolicy.init(this);
}
private void reinitializeQueueManagementPolicy() throws IOException {
AutoCreatedQueueManagementPolicy managementPolicy =
csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
queueContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
getQueuePath());
if (!(managementPolicy.getClass().equals(
this.queueManagementPolicy.getClass()))) {
queueManagementPolicy = managementPolicy;
queueManagementPolicy.init(csContext, this);
queueManagementPolicy.init(this);
} else{
queueManagementPolicy.reinitialize(csContext, this);
queueManagementPolicy.reinitialize(this);
}
}
@ -158,21 +153,25 @@ protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws
AutoCreatedLeafQueueConfig.Builder builder =
new AutoCreatedLeafQueueConfig.Builder();
CapacitySchedulerConfiguration configuration =
queueContext.getConfiguration();
// TODO load configs into CapacitySchedulerConfiguration instead of duplicating them
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
csContext.getConfiguration());
//Load template configuration
CapacitySchedulerConfiguration conf =
configuration);
//Load template configuration into CapacitySchedulerConfiguration
CapacitySchedulerConfiguration autoCreatedTemplateConfig =
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
builder.configuration(conf);
QueuePath templateQueuePath = csContext.getConfiguration()
builder.configuration(autoCreatedTemplateConfig);
QueuePath templateQueuePath = configuration
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePath());
Set<String> templateConfiguredNodeLabels = csContext
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
Set<String> templateConfiguredNodeLabels = queueContext
.getQueueManager().getConfiguredNodeLabelsForAllQueues()
.getLabelsByQueue(templateQueuePath.getFullPath());
for (String nodeLabel : templateConfiguredNodeLabels) {
Resource templateMinResource = conf.getMinimumResourceRequirement(
nodeLabel, csContext.getConfiguration()
Resource templateMinResource = autoCreatedTemplateConfig.getMinimumResourceRequirement(
nodeLabel, configuration
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes);
@ -187,7 +186,7 @@ protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadCapacitiesByLabelsFromConf(templateQueuePath,
queueCapacities,
csContext.getConfiguration(),
configuration,
templateConfiguredNodeLabels);
@ -205,35 +204,38 @@ protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws
}
private void updateQueueCapacities(QueueCapacities queueCapacities) {
CapacitySchedulerConfiguration configuration =
queueContext.getConfiguration();
for (String label : queueCapacities.getExistingNodeLabels()) {
queueCapacities.setCapacity(label,
this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
this.csContext.getConfiguration().getMinimumResourceRequirement(
resourceCalculator.divide(
queueContext.getClusterResource(),
configuration.getMinimumResourceRequirement(
label,
this.csContext.getConfiguration()
configuration
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes),
getQueueResourceQuotas().getConfiguredMinResource(label)));
Resource childMaxResource = this.csContext.getConfiguration()
Resource childMaxResource = configuration
.getMaximumResourceRequirement(label,
this.csContext.getConfiguration()
configuration
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes);
Resource parentMaxRes = getQueueResourceQuotas()
.getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(
this.csContext.getResourceCalculator(),
this.csContext.getClusterResource(),
resourceCalculator,
queueContext.getClusterResource(),
childMaxResource.equals(Resources.none()) ? parentMaxRes
: childMaxResource,
parentMaxRes);
queueCapacities.setMaximumCapacity(
label, this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
label, resourceCalculator.divide(
queueContext.getClusterResource(),
effMaxResource,
getQueueResourceQuotas().getConfiguredMaxResource(label)));
@ -268,7 +270,7 @@ public void addChildQueue(CSQueue childQueue)
"Expected child queue to be an instance of AutoCreatedLeafQueue");
}
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
ManagedParentQueue parentQueue =
(ManagedParentQueue) childQueue.getParent();
@ -322,8 +324,8 @@ public void addChildQueue(CSQueue childQueue)
// Do one update cluster resource call to make sure all absolute resources
// effective resources are updated.
updateClusterResource(this.csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
updateClusterResource(queueContext.getClusterResource(),
new ResourceLimits(queueContext.getClusterResource()));
} finally {
writeLock.unlock();
}
@ -427,12 +429,11 @@ public void validateQueueManagementChanges(
+ " Ignoring update " + queueManagementChanges);
}
switch (queueManagementChange.getQueueAction()){
case UPDATE_QUEUE:
if (queueManagementChange.getQueueAction() ==
QueueManagementChange.QueueAction.UPDATE_QUEUE) {
AutoCreatedLeafQueueConfig template =
queueManagementChange.getUpdatedQueueTemplate();
((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
break;
}
}
@ -442,14 +443,13 @@ private void applyQueueManagementChanges(
List<QueueManagementChange> queueManagementChanges)
throws SchedulerDynamicEditException, IOException {
for (QueueManagementChange queueManagementChange : queueManagementChanges) {
switch (queueManagementChange.getQueueAction()){
case UPDATE_QUEUE:
if (queueManagementChange.getQueueAction() ==
QueueManagementChange.QueueAction.UPDATE_QUEUE) {
AutoCreatedLeafQueue childQueueToBeUpdated =
(AutoCreatedLeafQueue) queueManagementChange.getQueue();
//acquires write lock on leaf queue
childQueueToBeUpdated.reinitializeFromTemplate(
queueManagementChange.getUpdatedQueueTemplate());
break;
}
}
}
@ -465,7 +465,7 @@ public CapacitySchedulerConfiguration getLeafQueueConfigs(
CapacitySchedulerConfiguration leafQueueConfigTemplate = new
CapacitySchedulerConfiguration(new Configuration(false), false);
for (final Iterator<Map.Entry<String, String>> iterator =
templateConfig.iterator(); iterator.hasNext(); ) {
templateConfig.iterator(); iterator.hasNext();) {
Map.Entry<String, String> confKeyValuePair = iterator.next();
final String name = confKeyValuePair.getKey().replaceFirst(
CapacitySchedulerConfiguration

View File

@ -87,7 +87,6 @@ public class ParentQueue extends AbstractCSQueue {
protected final List<CSQueue> childQueues;
private final boolean rootQueue;
private volatile int numApplications;
private final CapacitySchedulerContext scheduler;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -108,27 +107,20 @@ public class ParentQueue extends AbstractCSQueue {
// after every time recalculation
private volatile Map<String, Float> effectiveMinRatioPerResource;
public ParentQueue(CapacitySchedulerContext cs,
public ParentQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this(cs, cs.getConfiguration(), queueName, parent, old, false);
}
private ParentQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration csConf, String queueName,
CSQueue parent,
CSQueue old) throws IOException {
this(cs, csConf, queueName, parent, old, false);
this(queueContext, queueName, parent, old, false);
}
private ParentQueue(CapacitySchedulerContext cs,
CapacitySchedulerConfiguration csConf, String queueName, CSQueue parent,
CSQueue old, boolean isDynamic)
private ParentQueue(CapacitySchedulerQueueContext queueContext,
String queueName, CSQueue parent, CSQueue old, boolean isDynamic)
throws IOException {
super(cs, queueName, parent, old);
super(queueContext, queueName, parent, old);
setDynamicQueue(isDynamic);
this.scheduler = cs;
this.rootQueue = (parent == null);
float rawCapacity = csConf.getNonLabeledQueueCapacity(this.queuePath);
float rawCapacity = queueContext.getConfiguration()
.getNonLabeledQueueCapacity(this.queuePath);
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
@ -139,13 +131,10 @@ private ParentQueue(CapacitySchedulerContext cs,
this.childQueues = new ArrayList<>();
this.allowZeroCapacitySum =
cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath());
queueContext.getConfiguration()
.getAllowZeroCapacitySum(getQueuePath());
setupQueueConfigs(cs.getClusterResource(), csConf);
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
setupQueueConfigs(queueContext.getClusterResource(), queueContext.getConfiguration());
}
// returns what is configured queue ordering policy
@ -156,41 +145,42 @@ private String getQueueOrderingPolicyConfigName() {
}
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration csConf)
CapacitySchedulerConfiguration configuration)
throws IOException {
writeLock.lock();
try {
autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
csConf, this.queuePath);
super.setupQueueConfigs(clusterResource, csConf);
configuration, this.queuePath);
super.setupQueueConfigs(clusterResource, configuration);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) {
aclsString.append(e.getKey()).append(":")
.append(e.getValue().getAclString());
}
StringBuilder labelStrBuilder = new StringBuilder();
if (queueNodeLabelsSettings.getAccessibleNodeLabels() != null) {
for (String nodeLabel : queueNodeLabelsSettings.getAccessibleNodeLabels()) {
if (getAccessibleNodeLabels() != null) {
for (String nodeLabel : getAccessibleNodeLabels()) {
labelStrBuilder.append(nodeLabel).append(",");
}
}
// Initialize queue ordering policy
queueOrderingPolicy = csConf.getQueueOrderingPolicy(
queueOrderingPolicy = configuration.getQueueOrderingPolicy(
getQueuePath(), parent == null ?
null :
((ParentQueue) parent).getQueueOrderingPolicyConfigName());
queueOrderingPolicy.setQueues(childQueues);
LOG.info(getQueueName() + ", " + getCapacityOrWeightString()
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking
+ ", absoluteCapacity=" + getAbsoluteCapacity()
+ ", maxCapacity=" + getMaximumCapacity()
+ ", absoluteMaxCapacity=" + getAbsoluteMaximumCapacity()
+ ", state=" + getState() + ", acls="
+ aclsString + ", labels=" + labelStrBuilder + "\n"
+ ", reservationsContinueLooking=" + isReservationsContinueLooking()
+ ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
+ ", priority=" + priority
+ ", priority=" + getPriority()
+ ", allowZeroCapacitySum=" + allowZeroCapacitySum);
} finally {
writeLock.unlock();
@ -325,7 +315,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
.getConfiguredMinResource(nodeLabel));
}
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
scheduler.getClusterResource());
queueContext.getClusterResource());
Resource parentMinResource =
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
@ -488,11 +478,10 @@ private CSQueue createNewQueue(String childQueuePath, boolean isLeaf)
childQueuePath.lastIndexOf(".") + 1);
if (isLeaf) {
childQueue = new LeafQueue(csContext, csContext.getConfiguration(),
childQueue = new LeafQueue(queueContext,
queueShortName, this, null, true);
} else{
childQueue = new ParentQueue(csContext, csContext.getConfiguration(),
queueShortName, this, null, true);
childQueue = new ParentQueue(queueContext, queueShortName, this, null, true);
}
childQueue.setDynamicQueue(true);
// It should be sufficient now, we don't need to set more, because weights
@ -523,7 +512,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf)
// should not happen, since it will be handled before calling this method)
// , but we will move on.
CSQueue queue =
csContext.getCapacitySchedulerQueueManager().getQueueByFullName(
queueContext.getQueueManager().getQueueByFullName(
childQueuePath);
if (queue != null) {
LOG.warn(
@ -533,7 +522,7 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf)
}
// Check if the max queue limit is exceeded.
int maxQueues = csContext.getConfiguration().
int maxQueues = queueContext.getConfiguration().
getAutoCreatedQueuesV2MaxChildQueuesLimit(getQueuePath());
if (childQueues.size() >= maxQueues) {
throw new SchedulerDynamicEditException(
@ -564,8 +553,8 @@ private CSQueue addDynamicChildQueue(String childQueuePath, boolean isLeaf)
// Call updateClusterResource.
// Which will deal with all effectiveMin/MaxResource
// Calculation
this.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
this.updateClusterResource(queueContext.getClusterResource(),
new ResourceLimits(queueContext.getClusterResource()));
return newQueue;
} finally {
@ -596,14 +585,14 @@ public void removeChildQueue(CSQueue queue)
// Now we can do remove and update
this.childQueues.remove(queue);
this.scheduler.getCapacitySchedulerQueueManager()
queueContext.getQueueManager()
.removeQueue(queue.getQueuePath());
// Call updateClusterResource,
// which will deal with all effectiveMin/MaxResource
// Calculation
this.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
this.updateClusterResource(queueContext.getClusterResource(),
new ResourceLimits(queueContext.getClusterResource()));
} finally {
writeLock.unlock();
@ -617,7 +606,7 @@ public void removeChildQueue(CSQueue queue)
* false otherwise
*/
public boolean isEligibleForAutoQueueCreation() {
return isDynamicQueue() || csContext.getConfiguration().
return isDynamicQueue() || queueContext.getConfiguration().
isAutoQueueCreationV2Enabled(getQueuePath());
}
@ -644,7 +633,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource, csContext.getConfiguration());
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@ -685,7 +674,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
currentChildQueues.put(newChildQueueName, newChildQueue);
// inform CapacitySchedulerQueueManager
CapacitySchedulerQueueManager queueManager =
this.csContext.getCapacitySchedulerQueueManager();
queueContext.getQueueManager();
queueManager.addQueue(newChildQueueName, newChildQueue);
continue;
}
@ -1399,7 +1388,7 @@ public void recoverContainer(Resource clusterResource,
// Careful! Locking order is important!
writeLock.lock();
try {
FiCaSchedulerNode node = scheduler.getNode(
FiCaSchedulerNode node = queueContext.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
rmContainer.getContainer().getResource(), node.getPartition());
@ -1437,7 +1426,7 @@ public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
queueContext.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, rmContainer.getContainer()
.getResource(), node.getPartition());
LOG.info("movedContainer" + " queueMoveIn=" + getQueuePath()
@ -1456,7 +1445,7 @@ public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
queueContext.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
node.getPartition());
@ -1543,9 +1532,9 @@ private void killContainersToEnforceMaxQueueCapacity(String partition,
while (Resources.greaterThan(resourceCalculator, partitionResource,
usageTracker.getQueueUsage().getUsed(partition), maxResource)) {
RMContainer toKillContainer = killableContainerIter.next();
FiCaSchedulerApp attempt = csContext.getApplicationAttempt(
FiCaSchedulerApp attempt = queueContext.getApplicationAttempt(
toKillContainer.getContainerId().getApplicationAttemptId());
FiCaSchedulerNode node = csContext.getNode(
FiCaSchedulerNode node = queueContext.getNode(
toKillContainer.getAllocatedNode());
if (null != attempt && null != node) {
AbstractLeafQueue lq = attempt.getCSLeafQueue();
@ -1656,7 +1645,7 @@ Map<String, Float> getEffectiveMinRatioPerResource() {
@Override
public boolean isEligibleForAutoDeletion() {
return isDynamicQueue() && getChildQueues().size() == 0 &&
csContext.getConfiguration().
queueContext.getConfiguration().
isAutoExpiredDeletionEnabled(this.getQueuePath());
}

View File

@ -40,17 +40,15 @@ public class PlanQueue extends AbstractManagedParentQueue {
private int maxAppsPerUserForReservation;
private float userLimit;
private float userLimitFactor;
protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName,
public PlanQueue(CapacitySchedulerQueueContext queueContext, String queueName,
CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
super(queueContext, queueName, parent, old);
updateAbsoluteCapacities();
this.schedulerContext = cs;
// Set the reservation queue attributes for the Plan
CapacitySchedulerConfiguration conf = cs.getConfiguration();
CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
String queuePath = super.getQueuePath();
int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
@ -106,7 +104,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
// Set new configs
setupQueueConfigs(clusterResource, csContext.getConfiguration());
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,

View File

@ -32,12 +32,13 @@ public class QueueAllocationSettings {
private final Resource minimumAllocation;
private Resource maximumAllocation;
public QueueAllocationSettings(CapacitySchedulerContext csContext) {
this.minimumAllocation = csContext.getMinimumResourceCapability();
public QueueAllocationSettings(Resource minimumAllocation) {
this.minimumAllocation = minimumAllocation;
}
void setupMaximumAllocation(CapacitySchedulerConfiguration csConf, String queuePath,
CSQueue parent, CapacitySchedulerContext csContext) {
void setupMaximumAllocation(CapacitySchedulerConfiguration configuration,
CapacitySchedulerConfiguration originalSchedulerConfiguration, String queuePath,
CSQueue parent) {
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
* object is a cloned one containing only the template configs
* (see ManagedParentQueue#getLeafQueueConfigs). To ensure that the actual
@ -45,8 +46,8 @@ void setupMaximumAllocation(CapacitySchedulerConfiguration csConf, String queueP
* be used.
*/
Resource clusterMax = ResourceUtils
.fetchMaximumAllocationFromConfig(csContext.getConfiguration());
Resource queueMax = csConf.getQueueMaximumAllocation(queuePath);
.fetchMaximumAllocationFromConfig(originalSchedulerConfiguration);
Resource queueMax = configuration.getQueueMaximumAllocation(queuePath);
maximumAllocation = Resources.clone(
parent == null ? clusterMax : parent.getMaximumAllocation());
@ -59,8 +60,8 @@ void setupMaximumAllocation(CapacitySchedulerConfiguration csConf, String queueP
if (queueMax == Resources.none()) {
// Handle backward compatibility
long queueMemory = csConf.getQueueMaximumAllocationMb(queuePath);
int queueVcores = csConf.getQueueMaximumAllocationVcores(queuePath);
long queueMemory = configuration.getQueueMaximumAllocationMb(queuePath);
int queueVcores = configuration.getQueueMaximumAllocationVcores(queuePath);
if (queueMemory != UNDEFINED) {
maximumAllocation.setMemorySize(queueMemory);
}

View File

@ -31,7 +31,6 @@
public class QueueNodeLabelsSettings {
private final CSQueue parent;
private final String queuePath;
private final CapacitySchedulerContext csContext;
private Set<String> accessibleLabels;
private Set<String> configuredNodeLabels;
private String defaultLabelExpression;
@ -39,18 +38,18 @@ public class QueueNodeLabelsSettings {
public QueueNodeLabelsSettings(CapacitySchedulerConfiguration configuration,
CSQueue parent,
String queuePath,
CapacitySchedulerContext csContext) throws IOException {
ConfiguredNodeLabels configuredNodeLabels) throws IOException {
this.parent = parent;
this.queuePath = queuePath;
this.csContext = csContext;
initializeNodeLabels(configuration);
initializeNodeLabels(configuration, configuredNodeLabels);
}
private void initializeNodeLabels(CapacitySchedulerConfiguration configuration)
private void initializeNodeLabels(CapacitySchedulerConfiguration configuration,
ConfiguredNodeLabels configuredNodeLabels)
throws IOException {
initializeAccessibleLabels(configuration);
initializeDefaultLabelExpression(configuration);
initializeConfiguredNodeLabels();
initializeConfiguredNodeLabels(configuration, configuredNodeLabels);
validateNodeLabels();
}
@ -73,19 +72,17 @@ private void initializeDefaultLabelExpression(CapacitySchedulerConfiguration con
}
}
private void initializeConfiguredNodeLabels() {
if (csContext.getCapacitySchedulerQueueManager() != null
&& csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels() != null) {
private void initializeConfiguredNodeLabels(CapacitySchedulerConfiguration configuration,
ConfiguredNodeLabels configuredNodeLabelsParam) {
if (configuredNodeLabelsParam != null) {
if (queuePath.equals(ROOT)) {
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
.getConfiguredNodeLabels().getAllConfiguredLabels();
this.configuredNodeLabels = configuredNodeLabelsParam.getAllConfiguredLabels();
} else {
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
.getConfiguredNodeLabels().getLabelsByQueue(queuePath);
this.configuredNodeLabels = configuredNodeLabelsParam.getLabelsByQueue(queuePath);
}
} else {
// Fallback to suboptimal but correct logic
this.configuredNodeLabels = csContext.getConfiguration().getConfiguredNodeLabels(queuePath);
this.configuredNodeLabels = configuration.getConfiguredNodeLabels(queuePath);
}
}

View File

@ -31,20 +31,17 @@
*
*/
public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
private static final Logger LOG = LoggerFactory
.getLogger(ReservationQueue.class);
private static final Logger LOG =
LoggerFactory.getLogger(ReservationQueue.class);
private PlanQueue parent;
public ReservationQueue(CapacitySchedulerContext cs, String queueName,
public ReservationQueue(CapacitySchedulerQueueContext queueContext, String queueName,
PlanQueue parent) throws IOException {
super(cs, queueName, parent, null);
super.setupQueueConfigs(cs.getClusterResource(),
cs.getConfiguration());
super(queueContext, queueName, parent, null);
super.setupQueueConfigs(queueContext.getClusterResource(),
queueContext.getConfiguration());
LOG.debug("Initialized ReservationQueue: name={}, fullname={}",
queueName, getQueuePath());
// the following parameters are common to all reservation in the plan
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),

View File

@ -58,7 +58,6 @@ public class UsersManager implements AbstractUsersManager {
private final AbstractLeafQueue lQueue;
private final RMNodeLabelsManager labelManager;
private final ResourceCalculator resourceCalculator;
private final CapacitySchedulerContext scheduler;
private Map<String, User> users = new ConcurrentHashMap<>();
private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
@ -296,17 +295,13 @@ public void setWeight(float weight) {
* Leaf Queue Object
* @param labelManager
* Label Manager instance
* @param scheduler
* Capacity Scheduler Context
* @param resourceCalculator
* rc
*/
public UsersManager(QueueMetrics metrics, AbstractLeafQueue lQueue,
RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
ResourceCalculator resourceCalculator) {
RMNodeLabelsManager labelManager, ResourceCalculator resourceCalculator) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.lQueue = lQueue;
this.scheduler = scheduler;
this.labelManager = labelManager;
this.resourceCalculator = resourceCalculator;
this.qUsageRatios = new UsageRatios();
@ -844,10 +839,8 @@ partitionResource, getUsageRatio(nodePartition),
/**
* Update new usage ratio.
*
* @param partition
* Node partition
* @param clusterResource
* Cluster Resource
* @param partition Node partition
* @param clusterResource cluster resource
*/
public void updateUsageRatio(String partition, Resource clusterResource) {
writeLock.lock();
@ -1064,6 +1057,8 @@ private ResourceUsage getTotalResourceUsagePerUser(String userName) {
* Name of the user
* @param resource
* Resource to increment/decrement
* @param clusterResource
* Cluster resource (for testing purposes only)
* @param nodePartition
* Node label
* @param isAllocate
@ -1071,6 +1066,7 @@ private ResourceUsage getTotalResourceUsagePerUser(String userName) {
* @return user
*/
public User updateUserResourceUsage(String userName, Resource resource,
Resource clusterResource,
String nodePartition, boolean isAllocate) {
this.writeLock.lock();
try {
@ -1086,7 +1082,7 @@ public User updateUserResourceUsage(String userName, Resource resource,
// Update usage ratios
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
scheduler.getClusterResource());
clusterResource);
incQueueUsageRatio(nodePartition, user.updateUsageRatio(
resourceCalculator, resourceByLabel, nodePartition));

View File

@ -80,7 +80,7 @@ private boolean checkHeadroom(ResourceLimits currentResourceLimits,
// require
Resource resourceCouldBeUnReserved =
application.getAppAttemptResourceUsage().getReserved(nodePartition);
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
if (!application.getCSLeafQueue().isReservationsContinueLooking()) {
// If we don't allow reservation continuous looking,
// we won't allow to unreserve before allocation.
resourceCouldBeUnReserved = Resources.none();
@ -154,7 +154,7 @@ private ContainerAllocation preCheckForNodeCandidateSet(FiCaSchedulerNode node,
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
if (!application.getCSLeafQueue().isReservationsContinueLooking()) {
if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
LOG.debug("doesn't need containers based on reservation algo!");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
@ -551,7 +551,7 @@ private ContainerAllocation assignContainer(Resource clusterResource,
RMContainer unreservedContainer = null;
boolean reservationsContinueLooking =
application.getCSLeafQueue().getReservationContinueLooking();
application.getCSLeafQueue().isReservationsContinueLooking();
// Check if we need to kill some containers to allocate this one
List<RMContainer> toKillContainers = null;

View File

@ -70,7 +70,6 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
implements AutoCreatedQueueManagementPolicy {
private static final int DEFAULT_QUEUE_PRINT_SIZE_LIMIT = 25;
private CapacitySchedulerContext scheduler;
private ManagedParentQueue managedParentQueue;
private static final Logger LOG =
@ -263,9 +262,9 @@ private class PendingApplicationComparator
@Override
public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
RMApp rmApp1 = scheduler.getRMContext().getRMApps().get(
RMApp rmApp1 = managedParentQueue.getQueueContext().getRMApp(
app1.getApplicationId());
RMApp rmApp2 = scheduler.getRMContext().getRMApps().get(
RMApp rmApp2 = managedParentQueue.getQueueContext().getRMApp(
app2.getApplicationId());
if (rmApp1 != null && rmApp2 != null) {
return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime());
@ -283,10 +282,7 @@ public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
new PendingApplicationComparator();
@Override
public void init(final CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) throws IOException {
this.scheduler = schedulerContext;
public void init(final ParentQueue parentQueue) throws IOException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -372,7 +368,7 @@ public List<QueueManagementChange> computeQueueManagementChanges()
//Populate new entitlements
return leafQueueEntitlements.mapToQueueManagementChanges((leafQueueName, capacities) -> {
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
(AutoCreatedLeafQueue) managedParentQueue.getQueueContext().getQueueManager()
.getQueue(leafQueueName);
AutoCreatedLeafQueueConfig newTemplate = buildTemplate(capacities);
return new QueueManagementChange.UpdateQueue(leafQueue, newTemplate);
@ -651,7 +647,8 @@ public void commitQueueManagementChanges(
.mergeCapacities(updatedQueueTemplate.getQueueCapacities());
leafQueue.getQueueResourceQuotas()
.setConfiguredMinResource(Resources.multiply(
this.scheduler.getClusterResource(), updatedQueueTemplate
managedParentQueue.getQueueContext().getClusterResource(),
updatedQueueTemplate
.getQueueCapacities().getCapacity(nodeLabel)));
deactivate(leafQueue, nodeLabel);
}
@ -693,8 +690,7 @@ public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) {
}
@Override
public void reinitialize(CapacitySchedulerContext schedulerContext,
final ParentQueue parentQueue) throws IOException {
public void reinitialize(final ParentQueue parentQueue) throws IOException {
if (!(parentQueue instanceof ManagedParentQueue)) {
throw new IllegalStateException(
"Expected instance of type " + ManagedParentQueue.class + " found "

View File

@ -388,7 +388,7 @@ public void testQueueSubmitWithACLsEnabledWithQueueMappingForAutoCreatedQueue()
MockRM newMockRM = new MockRM(csConf);
CapacityScheduler cs =
((CapacityScheduler) newMockRM.getResourceScheduler());
ManagedParentQueue managedParentQueue = new ManagedParentQueue(cs,
ManagedParentQueue managedParentQueue = new ManagedParentQueue(cs.getQueueContext(),
"managedparent", cs.getQueue("root"), null);
cs.getCapacitySchedulerQueueManager().addQueue("managedparent",
managedParentQueue);

View File

@ -215,7 +215,7 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue()
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(QUEUED);
AutoCreatedLeafQueue d1 = new AutoCreatedLeafQueue(cs, "d1", parentQueue);
AutoCreatedLeafQueue d1 = new AutoCreatedLeafQueue(cs.getQueueContext(), "d1", parentQueue);
cs.addQueue(d1);
/**
@ -240,7 +240,7 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue()
* d1 will occupy all entire resource
* of Managed Parent queue.
*/
AutoCreatedLeafQueue d2 = new AutoCreatedLeafQueue(cs, "d2", parentQueue);
AutoCreatedLeafQueue d2 = new AutoCreatedLeafQueue(cs.getQueueContext(), "d2", parentQueue);
cs.addQueue(d2);
cs.getRootQueue().updateClusterResource(cs.getClusterResource(),

View File

@ -107,19 +107,11 @@ public void setUp() throws IOException {
rmContext = TestUtils.getMockRMContext();
Resource clusterResource = Resources.createResource(10 * 16 * GB, 10 * 32);
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(clusterResource);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
CapacitySchedulerContext csContext = createCSContext(csConf, resourceCalculator,
Resources.createResource(GB, 1), Resources.createResource(16*GB, 32),
clusterResource);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(csContext);
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
@ -129,13 +121,13 @@ public void setUp() throws IOException {
CSQueueStore queues = new CSQueueStore();
root = CapacitySchedulerQueueManager
.parseQueue(csContext, csConf, null, "root",
.parseQueue(queueContext, csConf, null, "root",
queues, queues,
TestUtils.spyHook);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
queue = spy(new LeafQueue(csContext, A, root, null));
queue = spy(new LeafQueue(queueContext, A, root, null));
QueueResourceQuotas queueResourceQuotas = ((LeafQueue) queues.get(A))
.getQueueResourceQuotas();
doReturn(queueResourceQuotas).when(queue).getQueueResourceQuotas();
@ -278,28 +270,21 @@ public void testLimitsComputation() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource =
Resources.createResource(100 * 16 * GB, 100 * 16);
when(csContext.getClusterResource()).thenReturn(clusterResource);
CapacitySchedulerContext csContext = createCSContext(csConf, resourceCalculator, Resources.createResource(GB, 1),
Resources.createResource(16*GB, 16), clusterResource);
CapacitySchedulerQueueManager queueManager = csContext.getCapacitySchedulerQueueManager();
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(csContext);
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
queueManager.setRootQueue(root);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@ -367,12 +352,14 @@ public void testLimitsComputation() throws Exception {
// Change the per-queue max AM resources percentage.
csConf.setFloat(PREFIX + queue.getQueuePath()
+ ".maximum-am-resource-percent", 0.5f);
queueContext.reinitialize();
// Re-create queues to get new configs.
queues = new CSQueueStore();
root = CapacitySchedulerQueueManager.parseQueue(
csContext, csConf, null, "root",
queueContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
clusterResource = Resources.createResource(100 * 16 * GB);
queueManager.setRootQueue(root);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
@ -391,10 +378,11 @@ public void testLimitsComputation() throws Exception {
// Change the per-queue max applications.
csConf.setInt(PREFIX + queue.getQueuePath() + ".maximum-applications",
9999);
queueContext.reinitialize();
// Re-create queues to get new configs.
queues = new CSQueueStore();
root = CapacitySchedulerQueueManager.parseQueue(
csContext, csConf, null, "root",
queueContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
@ -587,26 +575,19 @@ public void testHeadroom() throws Exception {
new CapacitySchedulerConfiguration();
csConf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 25);
setupQueueConfiguration(csConf);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB);
when(csContext.getClusterResource()).thenReturn(clusterResource);
CapacitySchedulerContext csContext = createCSContext(csConf, resourceCalculator, Resources.createResource(GB),
Resources.createResource(16*GB), clusterResource);
CapacitySchedulerQueueManager queueManager = csContext.getCapacitySchedulerQueueManager();
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(csContext);
CSQueueStore queues = new CSQueueStore();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(queueContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
queueManager.setRootQueue(rootQueue);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@ -952,27 +933,17 @@ public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
setupQueueConfiguration(csConf);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.3f);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getResourceCalculator()).
thenReturn(new DominantResourceCalculator());
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Total cluster resources.
Resource clusterResource = Resources.createResource(100 * GB, 1000);
when(csContext.getClusterResource()).thenReturn(clusterResource);
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(
createCSContext(csConf, new DominantResourceCalculator(), Resources.createResource(GB),
Resources.createResource(16*GB), clusterResource));
// Set up queue hierarchy.
CSQueueStore queues = new CSQueueStore();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(queueContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@ -1015,4 +986,32 @@ public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
+ amLimit.getVirtualCores(),
amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores());
}
private CapacitySchedulerContext createCSContext(CapacitySchedulerConfiguration csConf,
ResourceCalculator rc, Resource minResource, Resource maxResource, Resource clusterResource) {
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getMinimumResourceCapability()).
thenReturn(minResource);
when(csContext.getMaximumResourceCapability()).
thenReturn(maxResource);
when(csContext.getResourceCalculator()).
thenReturn(rc);
CapacitySchedulerQueueManager queueManager = new CapacitySchedulerQueueManager(conf,
rmContext.getNodeLabelManager(), null);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Total cluster resources.
when(csContext.getClusterResource()).thenReturn(clusterResource);
return csContext;
}
}

View File

@ -777,6 +777,12 @@ public void testHeadroom() throws Exception {
when(spyRMContext.getNodeLabelManager()).thenReturn(mgr);
when(csContext.getRMContext()).thenReturn(spyRMContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
CapacitySchedulerQueueManager queueManager =
new CapacitySchedulerQueueManager(csConf, mgr, null);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
// Setup nodelabels
queueManager.reinitConfiguredNodeLabels(csConf);
mgr.activateNode(NodeId.newInstance("h0", 0),
Resource.newInstance(160 * GB, 16)); // default Label
@ -789,16 +795,15 @@ public void testHeadroom() throws Exception {
Resource clusterResource = Resources.createResource(160 * GB);
when(csContext.getClusterResource()).thenReturn(clusterResource);
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(csContext);
CSQueueStore queues = new CSQueueStore();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(queueContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
queueManager.setRootQueue(rootQueue);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueResUsage);
// Manipulate queue 'a'
LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue) queues.get("b2"));
queue.updateClusterResource(clusterResource,

View File

@ -92,9 +92,13 @@ public void setup() throws IOException {
when(preemptionManager.getKillableResource(any(), anyString()))
.thenReturn(Resource.newInstance(0, 0));
when(scheduler.getPreemptionManager()).thenReturn(preemptionManager);
when(scheduler.getActivitiesManager()).thenReturn(activitiesManager);
queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
appPriorityACLManager);
queueManager.setCapacitySchedulerContext(scheduler);
when(scheduler.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
CapacitySchedulerQueueContext queueContext = new CapacitySchedulerQueueContext(scheduler);
when(scheduler.getQueueContext()).thenReturn(queueContext);
queueManager.initializeQueues(csConfig);
}

View File

@ -39,6 +39,7 @@ public class TestCSQueueStore {
private CSQueue root;
private CapacitySchedulerContext csContext;
private CapacitySchedulerQueueContext queueContext;
@Before
public void setUp() throws IOException {
@ -62,22 +63,26 @@ public void setUp() throws IOException {
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
new CapacitySchedulerQueueManager(csConf, null, null));
queueContext = new CapacitySchedulerQueueContext(csContext);
CSQueueStore queues = new CSQueueStore();
root = CapacitySchedulerQueueManager
.parseQueue(csContext, csConf, null, "root",
.parseQueue(queueContext, csConf, null, "root",
queues, queues,
TestUtils.spyHook);
}
public CSQueue createLeafQueue(String name, CSQueue parent)
throws IOException {
return new LeafQueue(csContext, name, parent, null);
return new LeafQueue(queueContext, name, parent, null);
}
public CSQueue createParentQueue(String name, CSQueue parent)
throws IOException {
return new ParentQueue(csContext, name, parent, null);
return new ParentQueue(queueContext, name, parent, null);
}
/**

View File

@ -680,7 +680,7 @@ public void testAutoCreationFailsWhenParentCapacityExceeded()
// Test add one auto created queue dynamically and manually modify
// capacity
ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c");
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1",
AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c1",
parentQueue);
newCS.addQueue(c1);
c1.setCapacity(0.5f);
@ -689,13 +689,13 @@ public void testAutoCreationFailsWhenParentCapacityExceeded()
setEntitlement(c1, new QueueEntitlement(0.5f, 1f));
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2",
AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c2",
parentQueue);
newCS.addQueue(c2);
setEntitlement(c2, new QueueEntitlement(0.5f, 1f));
try {
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3",
AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS.getQueueContext(), "c3",
parentQueue);
newCS.addQueue(c3);
fail("Expected exception for auto queue creation failure");

View File

@ -86,14 +86,14 @@ public void testRefreshQueuesWithReservations() throws Exception {
// Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
// Test add another reservation queue and use setEntitlement to modify
// capacity
ReservationQueue a2 =
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2);
cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f));
@ -116,7 +116,7 @@ public void testAddQueueFailCases() throws Exception {
try {
// Test invalid addition (adding non-zero size queue)
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a1", (PlanQueue) cs.getQueue("a"));
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
cs.addQueue(a1);
fail();
@ -126,7 +126,7 @@ public void testAddQueueFailCases() throws Exception {
// Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
//set default queue capacity to zero
((ReservationQueue) cs
@ -138,7 +138,7 @@ public void testAddQueueFailCases() throws Exception {
// Test add another reservation queue and use setEntitlement to modify
// capacity
ReservationQueue a2 =
new ReservationQueue(cs, "a2", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a2", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a2);
@ -165,7 +165,7 @@ public void testRemoveQueue() throws Exception {
// Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
new ReservationQueue(cs.getQueueContext(), "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
@ -249,7 +249,7 @@ public void testMoveAppToPlanQueue() throws Exception {
// create the default reservation queue
String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
ReservationQueue defQ =
new ReservationQueue(scheduler, defQName,
new ReservationQueue(scheduler.getQueueContext(), defQName,
(PlanQueue) scheduler.getQueue("a"));
scheduler.addQueue(defQ);
defQ.setEntitlement(new QueueEntitlement(1f, 1f));

View File

@ -177,7 +177,7 @@ private void testUserLimitThroughputWithNumberOfResourceTypes(
LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
// For now make user limit large so we can activate all applications
qb.setUserLimitFactor((float)100.0);
qb.setupConfigurableCapacities();
qb.setupConfigurableCapacities(cs.getConfiguration());
lqs[i] = qb;
}

View File

@ -74,6 +74,7 @@ public class TestChildQueueOrder {
YarnConfiguration conf;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
CapacitySchedulerQueueContext queueContext;
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
@ -100,6 +101,10 @@ public void setUp() throws Exception {
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
new CapacitySchedulerQueueManager(csConf, rmContext.getNodeLabelManager(), null));
queueContext = new CapacitySchedulerQueueContext(csContext);
}
private FiCaSchedulerApp getMockApplication(int appId, String user) {
@ -219,9 +224,10 @@ private void setupSortedQueues(CapacitySchedulerConfiguration conf) {
public void testSortedQueues() throws Exception {
// Setup queue configs
setupSortedQueues(csConf);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);

View File

@ -143,6 +143,7 @@ public class TestLeafQueue {
CapacityScheduler cs;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
CapacitySchedulerQueueContext queueContext;
private RMApp rmApp;
CSQueue root;
@ -203,6 +204,7 @@ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
csConf.setBoolean(CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES,
false);
csConf.setResourceComparator(rC.getClass());
final String newRoot = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRoot, withNodeLabels);
YarnConfiguration conf = new YarnConfiguration();
@ -228,12 +230,20 @@ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
containerTokenSecretManager.rollMasterKey();
when(csContext.getContainerTokenSecretManager()).thenReturn(
containerTokenSecretManager);
CapacitySchedulerQueueManager queueManager =
new CapacitySchedulerQueueManager(csConf, null, null);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
queueManager.reinitConfiguredNodeLabels(csConf);
queueContext = new CapacitySchedulerQueueContext(csContext);
root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
ROOT,
queues, queues,
TestUtils.spyHook);
queueManager.setRootQueue(root);
root.updateClusterResource(Resources.createResource(100 * 16 * GB, 100 * 32),
new ResourceLimits(Resources.createResource(100 * 16 * GB, 100 * 32)));
@ -242,8 +252,8 @@ private void setUpInternal(ResourceCalculator rC, boolean withNodeLabels)
.thenReturn(queueResUsage);
cs.setRMContext(spyRMContext);
cs.init(csConf);
cs.setResourceCalculator(rC);
cs.init(csConf);
when(spyRMContext.getScheduler()).thenReturn(cs);
when(spyRMContext.getYarnConfiguration())
@ -1087,11 +1097,12 @@ public void testUserLimitCache() throws Exception {
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
100);
queueContext.reinitialize();
// reinitialize queues
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
@ -1305,11 +1316,12 @@ public void testUserLimitCacheActiveUsersChanged() throws Exception {
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
100);
queueContext.reinitialize();
// reinitialize queues
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
@ -1920,6 +1932,7 @@ public void testUserSpecificUserLimits() throws Exception {
+ CapacitySchedulerConfiguration.USER_WEIGHT,
0.7f);
csConf.reinitializeConfigurationProperties();
queueContext.reinitialize();
when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32));
@ -3216,10 +3229,12 @@ public void testRackLocalityDelayScheduling() throws Exception {
csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 2);
csConf.setInt(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 1);
queueContext.reinitialize();
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(queueContext,
csConf, null, ROOT, newQueues, queues,
TestUtils.spyHook);
csContext.getCapacitySchedulerQueueManager().setRootQueue(newRoot);
root.reinitialize(newRoot, cs.getClusterResource());
// Manipulate queue 'b'
@ -3652,9 +3667,10 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT
* 2);
queueContext.reinitialize();
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
ROOT,
newQueues, queues,
TestUtils.spyHook);
@ -3683,12 +3699,14 @@ public void testLocalityDelaysAfterQueueRefresh() throws Exception {
csConf.setInt(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY, 60);
csConf.setInt(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY, 600);
queueContext.reinitialize();
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
ROOT,
newQueues, queues,
TestUtils.spyHook);
csContext.getCapacitySchedulerQueueManager().setRootQueue(newRoot);
root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization
@ -4043,8 +4061,14 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
0.1f);
CapacitySchedulerQueueManager queueManager = new CapacitySchedulerQueueManager(csConf,
rmContext.getNodeLabelManager(), null);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
CapacitySchedulerQueueContext newQueueContext = new CapacitySchedulerQueueContext(csContext);
CSQueue root;
root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
root = CapacitySchedulerQueueManager.parseQueue(newQueueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@ -4060,9 +4084,10 @@ public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
csConf.setFloat(
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
0.2f);
newQueueContext.reinitialize();
clusterResource = Resources.createResource(100 * 20 * GB, 100 * 32);
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(newQueueContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
root.reinitialize(newRoot, clusterResource);
@ -5112,15 +5137,15 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration()
assertEquals(0, conf.size());
conf.setNodeLocalityDelay(60);
conf.setCapacity(ROOT + DOT + leafQueueName, 10);
conf.setMaximumCapacity(ROOT + DOT + leafQueueName, 100);
conf.setUserLimitFactor(ROOT + DOT +leafQueueName, 0.1f);
csConf.setCapacity(ROOT + DOT + leafQueueName, 10);
csConf.setMaximumCapacity(ROOT + DOT + leafQueueName, 100);
csConf.setUserLimitFactor(ROOT + DOT +leafQueueName, 0.1f);
csConf.setNodeLocalityDelay(30);
csConf.setGlobalMaximumApplicationsPerQueue(20);
queueContext.reinitialize();
LeafQueue leafQueue = new LeafQueue(csContext, conf,
leafQueueName, cs.getRootQueue(),
LeafQueue leafQueue = new LeafQueue(queueContext, leafQueueName, cs.getRootQueue(),
null);
leafQueue.updateClusterResource(Resource.newInstance(0, 0),
@ -5148,6 +5173,7 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration()
// limit maximum apps by max system apps
csConf.setMaximumSystemApplications(15);
queueContext.reinitialize();
leafQueue.updateClusterResource(Resource.newInstance(0, 0),
new ResourceLimits(Resource.newInstance(0, 0)));

View File

@ -79,7 +79,8 @@ public class TestParentQueue {
YarnConfiguration conf;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
CapacitySchedulerQueueContext queueContext;
final static int GB = 1024;
final static String DEFAULT_RACK = "/default";
@ -105,6 +106,10 @@ public void setUp() throws Exception {
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(
new CapacitySchedulerQueueManager(csConf, rmContext.getNodeLabelManager(), null));
queueContext = new CapacitySchedulerQueueContext(csContext);
}
private static final String A = "a";
@ -121,6 +126,8 @@ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
conf.setCapacity(Q_A, 30);
conf.setCapacity(Q_B, 70);
queueContext.reinitialize();
LOG.info("Setup top-level queues a and b");
}
@ -137,6 +144,8 @@ private void setupSingleLevelQueuesWithAbsoluteResource(
conf.setMinimumResourceRequirement("", new QueuePath(Q_B),
QUEUE_B_RESOURCE);
queueContext.reinitialize();
LOG.info("Setup top-level queues a and b with absolute resource");
}
@ -253,7 +262,7 @@ public void testSingleLevelQueues() throws Exception {
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -371,11 +380,12 @@ public void testSingleLevelQueuesPrecision() throws Exception {
setupSingleLevelQueues(csConf);
csConf.setCapacity(Q_A, 30);
csConf.setCapacity(Q_B, 70.5F);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
boolean exceptionOccurred = false;
try {
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IOException ie) {
@ -386,10 +396,11 @@ public void testSingleLevelQueuesPrecision() throws Exception {
}
csConf.setCapacity(Q_A, 30);
csConf.setCapacity(Q_B, 70);
queueContext.reinitialize();
exceptionOccurred = false;
queues.clear();
try {
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@ -400,10 +411,11 @@ public void testSingleLevelQueuesPrecision() throws Exception {
}
csConf.setCapacity(Q_A, 30);
csConf.setCapacity(Q_B, 70.005F);
queueContext.reinitialize();
exceptionOccurred = false;
queues.clear();
try {
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@ -470,6 +482,7 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
conf.setQueues(Q_C111, new String[] {C1111});
final String Q_C1111= Q_C111 + "." + C1111;
conf.setCapacity(Q_C1111, 100);
queueContext.reinitialize();
}
@Test
@ -495,7 +508,7 @@ public void testMultiLevelQueues() throws Exception {
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -657,9 +670,10 @@ public void testQueueCapacitySettingChildZero() throws Exception {
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -672,9 +686,10 @@ public void testQueueCapacitySettingParentZero() throws Exception {
// set parent capacity to 0 when child not 0
csConf.setCapacity(Q_B, 0);
csConf.setCapacity(Q_A, 60);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -690,8 +705,9 @@ public void testQueueCapacitySettingParentZeroChildren100pctZeroSumAllowed()
csConf.setCapacity(Q_B, 0);
csConf.setCapacity(Q_A, 60);
csConf.setAllowZeroCapacitySum(Q_B, true);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -710,8 +726,9 @@ public void testQueueCapacitySettingParentZeroChildren50pctZeroSumAllowed()
csConf.setCapacity(Q_B + "." + B2, 20);
csConf.setCapacity(Q_B + "." + B3, 20);
csConf.setAllowZeroCapacitySum(Q_B, true);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -730,8 +747,9 @@ public void testQueueCapacitySettingParentNonZeroChildrenZeroSumAllowed()
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
csConf.setAllowZeroCapacitySum(Q_B, true);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@ -746,12 +764,12 @@ public void testQueueCapacityZero() throws Exception {
csConf.setCapacity(Q_B + "." + B1, 0);
csConf.setCapacity(Q_B + "." + B2, 0);
csConf.setCapacity(Q_B + "." + B3, 0);
csConf.setCapacity(Q_A, 60);
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
try {
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException e) {
@ -767,7 +785,7 @@ public void testOffSwitchScheduling() throws Exception {
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -849,7 +867,7 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
//B3
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -948,10 +966,11 @@ public void testQueueAcl() throws Exception {
csConf.setAcl(Q_C, QueueACL.ADMINISTER_QUEUE, "*");
final String Q_C11= Q_C + "." + C1 + "." + C11;
csConf.setAcl(Q_C11, QueueACL.SUBMIT_APPLICATIONS, "*");
queueContext.reinitialize();
CSQueueStore queues = new CSQueueStore();
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
YarnAuthorizationProvider authorizer =
@ -1014,7 +1033,7 @@ public void testAbsoluteResourceWithChangeInClusterResource()
setupSingleLevelQueuesWithAbsoluteResource(csConf);
CSQueueStore queues = new CSQueueStore();
CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
CSQueue root = CapacitySchedulerQueueManager.parseQueue(queueContext, csConf,
null, CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -1085,7 +1104,7 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
setupSingleLevelQueuesWithAbsoluteResource(csConf);
CSQueueStore queues = new CSQueueStore();
CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
CSQueue root = CapacitySchedulerQueueManager.parseQueue(queueContext, csConf,
null, CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@ -1138,6 +1157,7 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
// Set GlobalMaximumApplicationsPerQueue in csConf
csConf.setGlobalMaximumApplicationsPerQueue(8000);
queueContext.reinitialize();
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
@ -1155,6 +1175,7 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
Integer.toString(queueAMaxApplications));
csConf.set("yarn.scheduler.capacity." + Q_B + ".maximum-applications",
Integer.toString(queueBMaxApplications));
queueContext.reinitialize();
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));

View File

@ -46,6 +46,7 @@ public class TestReservationQueue {
private CapacitySchedulerConfiguration csConf;
private CapacitySchedulerContext csContext;
private CapacitySchedulerQueueContext queueContext;
final static int DEF_MAX_APPS = 10000;
final static int GB = 1024;
private final ResourceCalculator resourceCalculator =
@ -63,7 +64,7 @@ public void setup() throws IOException, SchedulerDynamicEditException {
CapacitySchedulerQueueManager csQm = mock(
CapacitySchedulerQueueManager.class);
ConfiguredNodeLabels labels = new ConfiguredNodeLabels(csConf);
when(csQm.getConfiguredNodeLabels()).thenReturn(labels);
when(csQm.getConfiguredNodeLabelsForAllQueues()).thenReturn(labels);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(csQm);
when(csContext.getConf()).thenReturn(conf);
@ -78,9 +79,11 @@ public void setup() throws IOException, SchedulerDynamicEditException {
RMContext mockRMContext = TestUtils.getMockRMContext();
when(csContext.getRMContext()).thenReturn(mockRMContext);
queueContext = new CapacitySchedulerQueueContext(csContext);
// create a queue
planQueue = new PlanQueue(csContext, "root", null, null);
autoCreatedLeafQueue = new ReservationQueue(csContext, "a", planQueue);
planQueue = new PlanQueue(queueContext, "root", null, null);
autoCreatedLeafQueue = new ReservationQueue(queueContext, "a", planQueue);
planQueue.addChildQueue(autoCreatedLeafQueue);
}

View File

@ -90,6 +90,7 @@ public class TestReservations {
CapacityScheduler cs;
// CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
CapacitySchedulerQueueContext queueContext;
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
@ -135,7 +136,10 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
CapacitySchedulerQueueManager queueManager = new CapacitySchedulerQueueManager(conf,
rmContext.getNodeLabelManager(), null);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(queueManager);
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(
conf);
@ -143,12 +147,11 @@ private void setup(CapacitySchedulerConfiguration csConf,
when(csContext.getContainerTokenSecretManager()).thenReturn(
containerTokenSecretManager);
root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
queueContext = new CapacitySchedulerQueueContext(csContext);
ResourceUsage queueResUsage = root.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
.thenReturn(queueResUsage);
root = CapacitySchedulerQueueManager.parseQueue(queueContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
queueManager.setRootQueue(root);
spyRMContext = spy(rmContext);
when(spyRMContext.getScheduler()).thenReturn(cs);
@ -1181,23 +1184,24 @@ public void testAssignToQueue() throws Exception {
public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
CapacitySchedulerConfiguration csConf) throws Exception {
// before reinitialization
assertEquals(true, a.getReservationContinueLooking());
assertEquals(true, a.isReservationsContinueLooking());
assertEquals(true,
((ParentQueue) a.getParent()).getReservationContinueLooking());
((ParentQueue) a.getParent()).isReservationsContinueLooking());
csConf.setBoolean(
CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
CSQueueStore newQueues = new CSQueueStore();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
queueContext.reinitialize();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(queueContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization
assertEquals(false, a.getReservationContinueLooking());
assertEquals(false, a.isReservationsContinueLooking());
assertEquals(false,
((ParentQueue) a.getParent()).getReservationContinueLooking());
((ParentQueue) a.getParent()).isReservationsContinueLooking());
}
@Test

View File

@ -58,15 +58,11 @@ public class TestUsersManager {
@Mock
private QueueMetrics metrics;
@Mock
private CapacitySchedulerContext context;
@Before
public void setup() {
usersManager = new UsersManager(metrics,
lQueue,
labelMgr,
context,
new DefaultResourceCalculator());
when(lQueue.getMinimumAllocation()).thenReturn(MINIMUM_ALLOCATION);