YARN-10929. Do not use a separate config in legacy CS AQC. Contributed by Benjamin Teke
This commit is contained in:
parent
8e08f43e03
commit
364d38f00d
|
@ -173,10 +173,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return new QueuePath(parent.getQueuePath(), queueName);
|
return new QueuePath(parent.getQueuePath(), queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupConfigurableCapacities(
|
protected void setupConfigurableCapacities() {
|
||||||
CapacitySchedulerConfiguration configuration) {
|
|
||||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
|
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
|
||||||
configuration, this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
queueContext.getConfiguration(), this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -329,14 +328,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
|
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupQueueConfigs(Resource clusterResource,
|
protected void setupQueueConfigs(Resource clusterResource) throws
|
||||||
CapacitySchedulerConfiguration configuration) throws
|
|
||||||
IOException {
|
IOException {
|
||||||
|
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
|
||||||
if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
|
if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
|
||||||
setDynamicQueueProperties(configuration);
|
setDynamicQueueProperties();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect and set the Node label configuration
|
// Collect and set the Node label configuration
|
||||||
|
@ -344,7 +343,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
getQueuePath(), queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());
|
getQueuePath(), queueContext.getQueueManager().getConfiguredNodeLabelsForAllQueues());
|
||||||
|
|
||||||
// Initialize the queue capacities
|
// Initialize the queue capacities
|
||||||
setupConfigurableCapacities(configuration);
|
setupConfigurableCapacities();
|
||||||
updateAbsoluteCapacities();
|
updateAbsoluteCapacities();
|
||||||
updateCapacityConfigType();
|
updateCapacityConfigType();
|
||||||
|
|
||||||
|
@ -354,26 +353,23 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
// Setup queue's maximumAllocation respecting the global
|
// Setup queue's maximumAllocation respecting the global
|
||||||
// and the queue settings
|
// and the queue settings
|
||||||
// TODO remove the getConfiguration() param after the AQC configuration duplication
|
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
|
||||||
// removal is resolved
|
|
||||||
this.queueAllocationSettings.setupMaximumAllocation(configuration,
|
|
||||||
queueContext.getConfiguration(), getQueuePath(),
|
|
||||||
parent);
|
parent);
|
||||||
|
|
||||||
// Initialize the queue state based on previous state, configured state
|
// Initialize the queue state based on previous state, configured state
|
||||||
// and its parent state
|
// and its parent state
|
||||||
initializeQueueState(configuration);
|
initializeQueueState();
|
||||||
|
|
||||||
authorizer = YarnAuthorizationProvider.getInstance(configuration);
|
authorizer = YarnAuthorizationProvider.getInstance(configuration);
|
||||||
|
|
||||||
this.acls = configuration.getAcls(getQueuePath());
|
this.acls = configuration.getAcls(getQueuePath());
|
||||||
|
|
||||||
this.userWeights = getUserWeightsFromHierarchy(configuration);
|
this.userWeights = getUserWeightsFromHierarchy();
|
||||||
|
|
||||||
this.reservationsContinueLooking =
|
this.reservationsContinueLooking =
|
||||||
configuration.getReservationContinueLook();
|
configuration.getReservationContinueLook();
|
||||||
|
|
||||||
this.configuredCapacityVectors = queueContext.getConfiguration()
|
this.configuredCapacityVectors = configuration
|
||||||
.parseConfiguredResourceVector(queuePath.getFullPath(),
|
.parseConfiguredResourceVector(queuePath.getFullPath(),
|
||||||
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
|
|
||||||
|
@ -382,10 +378,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
this, labelManager, null);
|
this, labelManager, null);
|
||||||
|
|
||||||
// Store preemption settings
|
// Store preemption settings
|
||||||
// TODO remove the getConfiguration() param after the AQC configuration duplication
|
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration);
|
||||||
// removal is resolved
|
|
||||||
this.preemptionSettings = new CSQueuePreemptionSettings(this, configuration,
|
|
||||||
queueContext.getConfiguration());
|
|
||||||
this.priority = configuration.getQueuePriority(
|
this.priority = configuration.getQueuePriority(
|
||||||
getQueuePath());
|
getQueuePath());
|
||||||
|
|
||||||
|
@ -403,14 +396,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set properties specific to dynamic queues.
|
* Set properties specific to dynamic queues.
|
||||||
* @param configuration configuration on which the properties are set
|
|
||||||
*/
|
*/
|
||||||
protected void setDynamicQueueProperties(
|
protected void setDynamicQueueProperties() {
|
||||||
CapacitySchedulerConfiguration configuration) {
|
|
||||||
// Set properties from parent template
|
// Set properties from parent template
|
||||||
if (parent instanceof ParentQueue) {
|
if (parent instanceof ParentQueue) {
|
||||||
((ParentQueue) parent).getAutoCreatedQueueTemplate()
|
((ParentQueue) parent).getAutoCreatedQueueTemplate()
|
||||||
.setTemplateEntriesForChild(configuration, getQueuePath());
|
.setTemplateEntriesForChild(queueContext.getConfiguration(), getQueuePath());
|
||||||
|
|
||||||
String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
|
String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
|
||||||
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
|
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
|
||||||
|
@ -428,8 +419,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private UserWeights getUserWeightsFromHierarchy(
|
private UserWeights getUserWeightsFromHierarchy() {
|
||||||
CapacitySchedulerConfiguration configuration) {
|
|
||||||
UserWeights unionInheritedWeights = UserWeights.createEmpty();
|
UserWeights unionInheritedWeights = UserWeights.createEmpty();
|
||||||
CSQueue parentQ = parent;
|
CSQueue parentQ = parent;
|
||||||
if (parentQ != null) {
|
if (parentQ != null) {
|
||||||
|
@ -439,7 +429,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
// Insert this queue's userWeights, overriding parent's userWeights if
|
// Insert this queue's userWeights, overriding parent's userWeights if
|
||||||
// there is an overlap.
|
// there is an overlap.
|
||||||
unionInheritedWeights.addFrom(
|
unionInheritedWeights.addFrom(
|
||||||
configuration.getAllUserWeightsForQueue(getQueuePath()));
|
queueContext.getConfiguration().getAllUserWeightsForQueue(getQueuePath()));
|
||||||
return unionInheritedWeights;
|
return unionInheritedWeights;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -572,9 +562,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return configuredCapacityVectors.get(label);
|
return configuredCapacityVectors.get(label);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
|
private void initializeQueueState() {
|
||||||
QueueState previousState = getState();
|
QueueState previousState = getState();
|
||||||
QueueState configuredState = configuration
|
QueueState configuredState = queueContext.getConfiguration()
|
||||||
.getConfiguredState(getQueuePath());
|
.getConfiguredState(getQueuePath());
|
||||||
QueueState parentState = (parent == null) ? null : parent.getState();
|
QueueState parentState = (parent == null) ? null : parent.getState();
|
||||||
|
|
||||||
|
|
|
@ -166,15 +166,12 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("checkstyle:nowhitespaceafter")
|
@SuppressWarnings("checkstyle:nowhitespaceafter")
|
||||||
protected void setupQueueConfigs(Resource clusterResource,
|
protected void setupQueueConfigs(Resource clusterResource) throws
|
||||||
CapacitySchedulerConfiguration conf) throws
|
|
||||||
IOException {
|
IOException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
// TODO conf parameter can be a modified configuration with template entries and missing
|
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
|
||||||
// some global configs. This config duplication needs to be removed.
|
super.setupQueueConfigs(clusterResource);
|
||||||
CapacitySchedulerConfiguration originalConfiguration = queueContext.getConfiguration();
|
|
||||||
super.setupQueueConfigs(clusterResource, conf);
|
|
||||||
|
|
||||||
this.lastClusterResource = clusterResource;
|
this.lastClusterResource = clusterResource;
|
||||||
|
|
||||||
|
@ -189,26 +186,26 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
setQueueResourceLimitsInfo(clusterResource);
|
setQueueResourceLimitsInfo(clusterResource);
|
||||||
|
|
||||||
setOrderingPolicy(
|
setOrderingPolicy(
|
||||||
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
|
configuration.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
|
||||||
|
|
||||||
usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
|
usersManager.setUserLimit(configuration.getUserLimit(getQueuePath()));
|
||||||
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
|
usersManager.setUserLimitFactor(configuration.getUserLimitFactor(getQueuePath()));
|
||||||
|
|
||||||
maxAMResourcePerQueuePercent =
|
maxAMResourcePerQueuePercent =
|
||||||
conf.getMaximumApplicationMasterResourcePerQueuePercent(
|
configuration.getMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
getQueuePath());
|
getQueuePath());
|
||||||
|
|
||||||
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
|
maxApplications = configuration.getMaximumApplicationsPerQueue(getQueuePath());
|
||||||
if (maxApplications < 0) {
|
if (maxApplications < 0) {
|
||||||
int maxGlobalPerQueueApps =
|
int maxGlobalPerQueueApps =
|
||||||
conf.getGlobalMaximumApplicationsPerQueue();
|
configuration.getGlobalMaximumApplicationsPerQueue();
|
||||||
if (maxGlobalPerQueueApps > 0) {
|
if (maxGlobalPerQueueApps > 0) {
|
||||||
maxApplications = maxGlobalPerQueueApps;
|
maxApplications = maxGlobalPerQueueApps;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
priorityAcls = conf.getPriorityAcls(getQueuePath(),
|
priorityAcls = configuration.getPriorityAcls(getQueuePath(),
|
||||||
originalConfiguration.getClusterLevelApplicationMaxPriority());
|
configuration.getClusterLevelApplicationMaxPriority());
|
||||||
|
|
||||||
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
|
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
|
||||||
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
|
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
|
||||||
|
@ -224,10 +221,10 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
.join(getAccessibleNodeLabels().iterator(), ',')));
|
.join(getAccessibleNodeLabels().iterator(), ',')));
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeLocalityDelay = originalConfiguration.getNodeLocalityDelay();
|
nodeLocalityDelay = configuration.getNodeLocalityDelay();
|
||||||
rackLocalityAdditionalDelay = originalConfiguration
|
rackLocalityAdditionalDelay = configuration
|
||||||
.getRackLocalityAdditionalDelay();
|
.getRackLocalityAdditionalDelay();
|
||||||
rackLocalityFullReset = originalConfiguration
|
rackLocalityFullReset = configuration
|
||||||
.getRackLocalityFullReset();
|
.getRackLocalityFullReset();
|
||||||
|
|
||||||
// re-init this since max allocation could have changed
|
// re-init this since max allocation could have changed
|
||||||
|
@ -250,10 +247,10 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
defaultAppPriorityPerQueue = Priority.newInstance(
|
defaultAppPriorityPerQueue = Priority.newInstance(
|
||||||
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
configuration.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
||||||
|
|
||||||
// Validate leaf queue's user's weights.
|
// Validate leaf queue's user's weights.
|
||||||
float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
|
float queueUserLimit = Math.min(100.0f, configuration.getUserLimit(getQueuePath()));
|
||||||
getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
|
getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
|
||||||
usersManager.updateUserWeights();
|
usersManager.updateUserWeights();
|
||||||
|
|
||||||
|
@ -529,9 +526,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void reinitialize(
|
@Override
|
||||||
CSQueue newlyParsedQueue, Resource clusterResource,
|
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws
|
||||||
CapacitySchedulerConfiguration configuration) throws
|
|
||||||
IOException {
|
IOException {
|
||||||
|
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -565,20 +561,12 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
+ newMax);
|
+ newMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
setupQueueConfigs(clusterResource, configuration);
|
setupQueueConfigs(clusterResource);
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void reinitialize(
|
|
||||||
CSQueue newlyParsedQueue, Resource clusterResource)
|
|
||||||
throws IOException {
|
|
||||||
reinitialize(newlyParsedQueue, clusterResource,
|
|
||||||
queueContext.getConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
String userName) {
|
String userName) {
|
||||||
|
@ -1700,13 +1688,13 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setDynamicQueueProperties(CapacitySchedulerConfiguration configuration) {
|
protected void setDynamicQueueProperties() {
|
||||||
// set to -1, to disable it
|
// set to -1, to disable it
|
||||||
configuration.setUserLimitFactor(getQueuePath(), -1);
|
queueContext.getConfiguration().setUserLimitFactor(getQueuePath(), -1);
|
||||||
// Set Max AM percentage to a higher value
|
// Set Max AM percentage to a higher value
|
||||||
configuration.setMaximumApplicationMasterResourcePerQueuePercent(
|
queueContext.getConfiguration().setMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
getQueuePath(), 1f);
|
getQueuePath(), 1f);
|
||||||
super.setDynamicQueueProperties(configuration);
|
super.setDynamicQueueProperties();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateSchedulerHealthForCompletedContainer(
|
private void updateSchedulerHealthForCompletedContainer(
|
||||||
|
@ -1948,7 +1936,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
super.updateEffectiveResources(clusterResource);
|
super.updateEffectiveResources(clusterResource);
|
||||||
|
|
||||||
// Update maximum applications for the queue and for users
|
// Update maximum applications for the queue and for users
|
||||||
updateMaximumApplications(queueContext.getConfiguration());
|
updateMaximumApplications();
|
||||||
|
|
||||||
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
|
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
|
||||||
|
|
||||||
|
@ -2342,11 +2330,12 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
|
void updateMaximumApplications() {
|
||||||
int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());
|
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
|
||||||
|
int maxAppsForQueue = configuration.getMaximumApplicationsPerQueue(getQueuePath());
|
||||||
|
|
||||||
int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
|
int maxDefaultPerQueueApps = configuration.getGlobalMaximumApplicationsPerQueue();
|
||||||
int maxSystemApps = conf.getMaximumSystemApplications();
|
int maxSystemApps = configuration.getMaximumSystemApplications();
|
||||||
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
|
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
|
||||||
Math.min(maxDefaultPerQueueApps, maxSystemApps)
|
Math.min(maxDefaultPerQueueApps, maxSystemApps)
|
||||||
: maxSystemApps;
|
: maxSystemApps;
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
|
||||||
|
@ -55,7 +54,7 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
// Set new configs
|
// Set new configs
|
||||||
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
|
setupQueueConfigs(clusterResource);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
|
@ -175,22 +174,12 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
|
||||||
CapacitySchedulerConfiguration leafQueueConfigs = new
|
CapacitySchedulerConfiguration leafQueueConfigs = new
|
||||||
CapacitySchedulerConfiguration(new Configuration(false), false);
|
CapacitySchedulerConfiguration(new Configuration(false), false);
|
||||||
|
|
||||||
Map<String, String> rtProps = queueContext
|
|
||||||
.getConfiguration().getConfigurationProperties()
|
|
||||||
.getPropertiesWithPrefix(YarnConfiguration.RESOURCE_TYPES + ".", true);
|
|
||||||
for (Map.Entry<String, String> entry : rtProps.entrySet()) {
|
|
||||||
leafQueueConfigs.set(entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
Map<String, String> templateConfigs = queueContext
|
Map<String, String> templateConfigs = queueContext
|
||||||
.getConfiguration().getConfigurationProperties()
|
.getConfiguration().getConfigurationProperties()
|
||||||
.getPropertiesWithPrefix(configPrefix, true);
|
.getPropertiesWithPrefix(configPrefix, true);
|
||||||
|
|
||||||
for (final Iterator<Map.Entry<String, String>> iterator =
|
for (Map.Entry<String, String> confKeyValuePair : templateConfigs.entrySet()) {
|
||||||
templateConfigs.entrySet().iterator(); iterator.hasNext(); ) {
|
leafQueueConfigs.set(confKeyValuePair.getKey(), confKeyValuePair.getValue());
|
||||||
Map.Entry<String, String> confKeyValuePair = iterator.next();
|
|
||||||
leafQueueConfigs.set(confKeyValuePair.getKey(),
|
|
||||||
confKeyValuePair.getValue());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return leafQueueConfigs;
|
return leafQueueConfigs;
|
||||||
|
|
|
@ -42,7 +42,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
|
||||||
public AutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext, String queueName,
|
public AutoCreatedLeafQueue(CapacitySchedulerQueueContext queueContext, String queueName,
|
||||||
ManagedParentQueue parent) throws IOException {
|
ManagedParentQueue parent) throws IOException {
|
||||||
super(queueContext, queueName, parent, null);
|
super(queueContext, queueName, parent, null);
|
||||||
super.setupQueueConfigs(queueContext.getClusterResource(), parent.getLeafQueueConfigs(queueName));
|
parent.setLeafQueueConfigs(queueName);
|
||||||
|
super.setupQueueConfigs(queueContext.getClusterResource());
|
||||||
|
|
||||||
updateCapacitiesToZero();
|
updateCapacitiesToZero();
|
||||||
}
|
}
|
||||||
|
@ -56,8 +57,8 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
|
||||||
|
|
||||||
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;
|
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;
|
||||||
|
|
||||||
super.reinitialize(newlyParsedQueue, clusterResource, managedParentQueue
|
managedParentQueue.setLeafQueueConfigs(newlyParsedQueue.getQueueShortName());
|
||||||
.getLeafQueueConfigs(newlyParsedQueue.getQueueShortName()));
|
super.reinitialize(newlyParsedQueue, clusterResource);
|
||||||
|
|
||||||
//Reset capacities to 0 since reinitialize above
|
//Reset capacities to 0 since reinitialize above
|
||||||
// queueCapacities to initialize to configured capacity which might
|
// queueCapacities to initialize to configured capacity which might
|
||||||
|
@ -122,8 +123,7 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setDynamicQueueProperties(
|
protected void setDynamicQueueProperties() {
|
||||||
CapacitySchedulerConfiguration configuration) {
|
|
||||||
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
|
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
|
||||||
CapacitySchedulerConfiguration
|
CapacitySchedulerConfiguration
|
||||||
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
|
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
|
||||||
|
|
|
@ -26,13 +26,10 @@ public class CSQueuePreemptionSettings {
|
||||||
|
|
||||||
public CSQueuePreemptionSettings(
|
public CSQueuePreemptionSettings(
|
||||||
CSQueue queue,
|
CSQueue queue,
|
||||||
CapacitySchedulerConfiguration configuration,
|
CapacitySchedulerConfiguration configuration) {
|
||||||
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
|
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(queue, configuration);
|
||||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(queue, configuration,
|
|
||||||
originalSchedulerConfiguration);
|
|
||||||
this.intraQueuePreemptionDisabledInHierarchy =
|
this.intraQueuePreemptionDisabledInHierarchy =
|
||||||
isIntraQueueHierarchyPreemptionDisabled(queue, configuration,
|
isIntraQueueHierarchyPreemptionDisabled(queue, configuration);
|
||||||
originalSchedulerConfiguration);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,10 +43,9 @@ public class CSQueuePreemptionSettings {
|
||||||
* @return true if queue has cross-queue preemption disabled, false otherwise
|
* @return true if queue has cross-queue preemption disabled, false otherwise
|
||||||
*/
|
*/
|
||||||
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
|
||||||
CapacitySchedulerConfiguration configuration,
|
CapacitySchedulerConfiguration configuration) {
|
||||||
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
|
|
||||||
boolean systemWidePreemption =
|
boolean systemWidePreemption =
|
||||||
originalSchedulerConfiguration
|
configuration
|
||||||
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
|
||||||
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
|
||||||
CSQueue parentQ = q.getParent();
|
CSQueue parentQ = q.getParent();
|
||||||
|
@ -85,10 +81,9 @@ public class CSQueuePreemptionSettings {
|
||||||
* @return true if queue has intra-queue preemption disabled, false otherwise
|
* @return true if queue has intra-queue preemption disabled, false otherwise
|
||||||
*/
|
*/
|
||||||
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
|
private boolean isIntraQueueHierarchyPreemptionDisabled(CSQueue q,
|
||||||
CapacitySchedulerConfiguration configuration,
|
CapacitySchedulerConfiguration configuration) {
|
||||||
CapacitySchedulerConfiguration originalSchedulerConfiguration) {
|
|
||||||
boolean systemWideIntraQueuePreemption =
|
boolean systemWideIntraQueuePreemption =
|
||||||
originalSchedulerConfiguration.getBoolean(
|
configuration.getBoolean(
|
||||||
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
|
||||||
CapacitySchedulerConfiguration
|
CapacitySchedulerConfiguration
|
||||||
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
|
||||||
|
|
|
@ -96,6 +96,10 @@ public class CapacitySchedulerQueueContext {
|
||||||
return configuration;
|
return configuration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setConfigurationEntry(String name, String value) {
|
||||||
|
this.configuration.set(name, value);
|
||||||
|
}
|
||||||
|
|
||||||
public Resource getMinimumAllocation() {
|
public Resource getMinimumAllocation() {
|
||||||
return minimumAllocation;
|
return minimumAllocation;
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,6 @@ public class LeafQueue extends AbstractLeafQueue {
|
||||||
IOException {
|
IOException {
|
||||||
super(queueContext, queueName, parent, old, isDynamic);
|
super(queueContext, queueName, parent, old, isDynamic);
|
||||||
|
|
||||||
setupQueueConfigs(queueContext.getClusterResource(), queueContext.getConfiguration());
|
setupQueueConfigs(queueContext.getClusterResource());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -454,25 +452,13 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public CapacitySchedulerConfiguration getLeafQueueConfigs(
|
public void setLeafQueueConfigs(String leafQueueName) {
|
||||||
String leafQueueName) {
|
CapacitySchedulerConfiguration templateConfig = leafQueueTemplate.getLeafQueueConfigs();
|
||||||
return getLeafQueueConfigs(getLeafQueueTemplate().getLeafQueueConfigs(),
|
for (Map.Entry<String, String> confKeyValuePair : templateConfig) {
|
||||||
leafQueueName);
|
final String name = confKeyValuePair.getKey()
|
||||||
}
|
.replaceFirst(CapacitySchedulerConfiguration.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
|
||||||
|
leafQueueName);
|
||||||
public CapacitySchedulerConfiguration getLeafQueueConfigs(
|
queueContext.setConfigurationEntry(name, confKeyValuePair.getValue());
|
||||||
CapacitySchedulerConfiguration templateConfig, String leafQueueName) {
|
|
||||||
CapacitySchedulerConfiguration leafQueueConfigTemplate = new
|
|
||||||
CapacitySchedulerConfiguration(new Configuration(false), false);
|
|
||||||
for (final Iterator<Map.Entry<String, String>> iterator =
|
|
||||||
templateConfig.iterator(); iterator.hasNext();) {
|
|
||||||
Map.Entry<String, String> confKeyValuePair = iterator.next();
|
|
||||||
final String name = confKeyValuePair.getKey().replaceFirst(
|
|
||||||
CapacitySchedulerConfiguration
|
|
||||||
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
|
|
||||||
leafQueueName);
|
|
||||||
leafQueueConfigTemplate.set(name, confKeyValuePair.getValue());
|
|
||||||
}
|
}
|
||||||
return leafQueueConfigTemplate;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -134,7 +134,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
queueContext.getConfiguration()
|
queueContext.getConfiguration()
|
||||||
.getAllowZeroCapacitySum(getQueuePath());
|
.getAllowZeroCapacitySum(getQueuePath());
|
||||||
|
|
||||||
setupQueueConfigs(queueContext.getClusterResource(), queueContext.getConfiguration());
|
setupQueueConfigs(queueContext.getClusterResource());
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns what is configured queue ordering policy
|
// returns what is configured queue ordering policy
|
||||||
|
@ -144,14 +144,14 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
queueOrderingPolicy.getConfigName();
|
queueOrderingPolicy.getConfigName();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupQueueConfigs(Resource clusterResource,
|
protected void setupQueueConfigs(Resource clusterResource)
|
||||||
CapacitySchedulerConfiguration configuration)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
CapacitySchedulerConfiguration configuration = queueContext.getConfiguration();
|
||||||
autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
|
autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
|
||||||
configuration, this.queuePath);
|
configuration, this.queuePath);
|
||||||
super.setupQueueConfigs(clusterResource, configuration);
|
super.setupQueueConfigs(clusterResource);
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) {
|
for (Map.Entry<AccessType, AccessControlList> e : getACLs().entrySet()) {
|
||||||
aclsString.append(e.getKey()).append(":")
|
aclsString.append(e.getKey()).append(":")
|
||||||
|
@ -633,7 +633,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
|
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
|
||||||
|
|
||||||
// Set new configs
|
// Set new configs
|
||||||
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
|
setupQueueConfigs(clusterResource);
|
||||||
|
|
||||||
// Re-configure existing child queues and add new ones
|
// Re-configure existing child queues and add new ones
|
||||||
// The CS has already checked to ensure all existing child queues are present!
|
// The CS has already checked to ensure all existing child queues are present!
|
||||||
|
|
|
@ -104,7 +104,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set new configs
|
// Set new configs
|
||||||
setupQueueConfigs(clusterResource, queueContext.getConfiguration());
|
setupQueueConfigs(clusterResource);
|
||||||
|
|
||||||
updateQuotas(newlyParsedParentQueue.userLimit,
|
updateQuotas(newlyParsedParentQueue.userLimit,
|
||||||
newlyParsedParentQueue.userLimitFactor,
|
newlyParsedParentQueue.userLimitFactor,
|
||||||
|
|
|
@ -36,17 +36,10 @@ public class QueueAllocationSettings {
|
||||||
this.minimumAllocation = minimumAllocation;
|
this.minimumAllocation = minimumAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setupMaximumAllocation(CapacitySchedulerConfiguration configuration,
|
void setupMaximumAllocation(CapacitySchedulerConfiguration configuration, String queuePath,
|
||||||
CapacitySchedulerConfiguration originalSchedulerConfiguration, String queuePath,
|
|
||||||
CSQueue parent) {
|
CSQueue parent) {
|
||||||
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
|
|
||||||
* object is a cloned one containing only the template configs
|
|
||||||
* (see ManagedParentQueue#getLeafQueueConfigs). To ensure that the actual
|
|
||||||
* cluster maximum allocation is fetched the original config object should
|
|
||||||
* be used.
|
|
||||||
*/
|
|
||||||
Resource clusterMax = ResourceUtils
|
Resource clusterMax = ResourceUtils
|
||||||
.fetchMaximumAllocationFromConfig(originalSchedulerConfiguration);
|
.fetchMaximumAllocationFromConfig(configuration);
|
||||||
Resource queueMax = configuration.getQueueMaximumAllocation(queuePath);
|
Resource queueMax = configuration.getQueueMaximumAllocation(queuePath);
|
||||||
|
|
||||||
maximumAllocation = Resources.clone(
|
maximumAllocation = Resources.clone(
|
||||||
|
|
|
@ -39,8 +39,7 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
|
||||||
public ReservationQueue(CapacitySchedulerQueueContext queueContext, String queueName,
|
public ReservationQueue(CapacitySchedulerQueueContext queueContext, String queueName,
|
||||||
PlanQueue parent) throws IOException {
|
PlanQueue parent) throws IOException {
|
||||||
super(queueContext, queueName, parent, null);
|
super(queueContext, queueName, parent, null);
|
||||||
super.setupQueueConfigs(queueContext.getClusterResource(),
|
super.setupQueueConfigs(queueContext.getClusterResource());
|
||||||
queueContext.getConfiguration());
|
|
||||||
|
|
||||||
// the following parameters are common to all reservation in the plan
|
// the following parameters are common to all reservation in the plan
|
||||||
updateQuotas(parent.getUserLimitForReservation(),
|
updateQuotas(parent.getUserLimitForReservation(),
|
||||||
|
@ -84,8 +83,7 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void setupConfigurableCapacities(CapacitySchedulerConfiguration
|
protected void setupConfigurableCapacities() {
|
||||||
configuration) {
|
|
||||||
super.updateAbsoluteCapacities();
|
super.updateAbsoluteCapacities();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -177,7 +177,7 @@ public class TestCapacitySchedulerPerf {
|
||||||
LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
|
LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
|
||||||
// For now make user limit large so we can activate all applications
|
// For now make user limit large so we can activate all applications
|
||||||
qb.setUserLimitFactor((float)100.0);
|
qb.setUserLimitFactor((float)100.0);
|
||||||
qb.setupConfigurableCapacities(cs.getConfiguration());
|
qb.setupConfigurableCapacities();
|
||||||
lqs[i] = qb;
|
lqs[i] = qb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue