YARN-10904. Investigate: Remove unnecessary fields from AbstractCSQueue (#3551) contributed by Szilard Nemeth
This commit is contained in:
parent
66ac476b48
commit
d598904046
|
@ -111,7 +111,7 @@ public class AbstractAutoCreatedLeafQueue extends LeafQueue {
|
||||||
}
|
}
|
||||||
setCapacity(nodeLabel, capacity);
|
setCapacity(nodeLabel, capacity);
|
||||||
setAbsoluteCapacity(nodeLabel,
|
setAbsoluteCapacity(nodeLabel,
|
||||||
getParent().getQueueCapacities().
|
this.getParent().getQueueCapacities().
|
||||||
getAbsoluteCapacity(nodeLabel)
|
getAbsoluteCapacity(nodeLabel)
|
||||||
* getQueueCapacities().getCapacity(nodeLabel));
|
* getQueueCapacities().getCapacity(nodeLabel));
|
||||||
// note: we currently set maxCapacity to capacity
|
// note: we currently set maxCapacity to capacity
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.QueueStatistics;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.security.AccessRequest;
|
import org.apache.hadoop.yarn.security.AccessRequest;
|
||||||
|
@ -77,28 +75,24 @@ import java.util.Set;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
|
|
||||||
|
|
||||||
public abstract class AbstractCSQueue implements CSQueue {
|
public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractCSQueue.class);
|
LoggerFactory.getLogger(AbstractCSQueue.class);
|
||||||
|
protected final QueueAllocationSettings queueAllocationSettings;
|
||||||
volatile CSQueue parent;
|
volatile CSQueue parent;
|
||||||
|
protected final QueuePath queuePath;
|
||||||
final String queueName;
|
final String queueName;
|
||||||
private final String queuePath;
|
protected QueueNodeLabelsSettings queueNodeLabelsSettings;
|
||||||
|
private volatile QueueAppLifetimeAndLimitSettings queueAppLifetimeSettings;
|
||||||
|
private CSQueuePreemptionSettings preemptionSettings;
|
||||||
|
|
||||||
final Resource minimumAllocation;
|
|
||||||
volatile Resource maximumAllocation;
|
|
||||||
private volatile QueueState state = null;
|
private volatile QueueState state = null;
|
||||||
protected final PrivilegedEntity queueEntity;
|
protected final PrivilegedEntity queueEntity;
|
||||||
|
|
||||||
final ResourceCalculator resourceCalculator;
|
final ResourceCalculator resourceCalculator;
|
||||||
Set<String> accessibleLabels;
|
|
||||||
protected Set<String> configuredNodeLabels;
|
|
||||||
Set<String> resourceTypes;
|
Set<String> resourceTypes;
|
||||||
final RMNodeLabelsManager labelManager;
|
final RMNodeLabelsManager labelManager;
|
||||||
String defaultLabelExpression;
|
|
||||||
private String multiNodeSortingPolicyName = null;
|
private String multiNodeSortingPolicyName = null;
|
||||||
|
|
||||||
Map<AccessType, AccessControlList> acls =
|
Map<AccessType, AccessControlList> acls =
|
||||||
|
@ -109,17 +103,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
// used-capacity/abs-used-capacity/capacity/abs-capacity,
|
// used-capacity/abs-used-capacity/capacity/abs-capacity,
|
||||||
// etc.
|
// etc.
|
||||||
QueueCapacities queueCapacities;
|
QueueCapacities queueCapacities;
|
||||||
|
|
||||||
// -1 indicates lifetime is disabled
|
|
||||||
private volatile long maxApplicationLifetime = -1;
|
|
||||||
|
|
||||||
private volatile long defaultApplicationLifetime = -1;
|
|
||||||
|
|
||||||
// Indicates if this queue's default lifetime was set by a config property,
|
|
||||||
// either at this level or anywhere in the queue's hierarchy.
|
|
||||||
private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false;
|
|
||||||
private CSQueuePreemption preemptionSettings;
|
|
||||||
|
|
||||||
CSQueueUsageTracker usageTracker;
|
CSQueueUsageTracker usageTracker;
|
||||||
|
|
||||||
public enum CapacityConfigType {
|
public enum CapacityConfigType {
|
||||||
|
@ -145,7 +128,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
volatile Priority priority = Priority.newInstance(0);
|
volatile Priority priority = Priority.newInstance(0);
|
||||||
private UserWeights userWeights = UserWeights.createEmpty();
|
private UserWeights userWeights = UserWeights.createEmpty();
|
||||||
private int maxParallelApps;
|
|
||||||
|
|
||||||
// is it a dynamic queue?
|
// is it a dynamic queue?
|
||||||
private boolean dynamicQueue = false;
|
private boolean dynamicQueue = false;
|
||||||
|
@ -158,12 +140,10 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||||
CapacitySchedulerConfiguration configuration, String queueName,
|
CapacitySchedulerConfiguration configuration, String queueName,
|
||||||
CSQueue parent, CSQueue old) {
|
CSQueue parent, CSQueue old) {
|
||||||
|
|
||||||
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
||||||
this.parent = parent;
|
this.parent = parent;
|
||||||
this.queueName = queueName;
|
this.queuePath = createQueuePath(parent, queueName);
|
||||||
this.queuePath = ((parent == null) ? "" : (parent.getQueuePath() + "."))
|
this.queueName = queuePath.getLeafName();
|
||||||
+ this.queueName;
|
|
||||||
this.resourceCalculator = cs.getResourceCalculator();
|
this.resourceCalculator = cs.getResourceCalculator();
|
||||||
this.activitiesManager = cs.getActivitiesManager();
|
this.activitiesManager = cs.getActivitiesManager();
|
||||||
|
|
||||||
|
@ -174,7 +154,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
cs.getConfiguration().getEnableUserMetrics(), configuration);
|
cs.getConfiguration().getEnableUserMetrics(), configuration);
|
||||||
usageTracker = new CSQueueUsageTracker(metrics);
|
usageTracker = new CSQueueUsageTracker(metrics);
|
||||||
this.csContext = cs;
|
this.csContext = cs;
|
||||||
this.minimumAllocation = csContext.getMinimumResourceCapability();
|
this.queueAllocationSettings = new QueueAllocationSettings(csContext);
|
||||||
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
|
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
|
||||||
queueCapacities = new QueueCapacities(parent == null);
|
queueCapacities = new QueueCapacities(parent == null);
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
@ -182,6 +162,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
writeLock = lock.writeLock();
|
writeLock = lock.writeLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static QueuePath createQueuePath(CSQueue parent, String queueName) {
|
||||||
|
if (parent == null) {
|
||||||
|
return new QueuePath(null, queueName);
|
||||||
|
}
|
||||||
|
return new QueuePath(parent.getQueuePath(), queueName);
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected void setupConfigurableCapacities() {
|
protected void setupConfigurableCapacities() {
|
||||||
setupConfigurableCapacities(csContext.getConfiguration());
|
setupConfigurableCapacities(csContext.getConfiguration());
|
||||||
|
@ -190,12 +177,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
protected void setupConfigurableCapacities(
|
protected void setupConfigurableCapacities(
|
||||||
CapacitySchedulerConfiguration configuration) {
|
CapacitySchedulerConfiguration configuration) {
|
||||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities,
|
CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities,
|
||||||
configuration, configuredNodeLabels);
|
configuration, this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getQueuePath() {
|
public String getQueuePath() {
|
||||||
return queuePath;
|
return queuePath.getFullPath();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -249,7 +236,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getQueueShortName() {
|
public String getQueueShortName() {
|
||||||
return queueName;
|
return queuePath.getLeafName();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -257,11 +244,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return queueName;
|
return queueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public PrivilegedEntity getPrivilegedEntity() {
|
|
||||||
return queueEntity;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CSQueue getParent() {
|
public CSQueue getParent() {
|
||||||
return parent;
|
return parent;
|
||||||
|
@ -272,8 +254,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
this.parent = newParentQueue;
|
this.parent = newParentQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PrivilegedEntity getPrivilegedEntity() {
|
||||||
|
return queueEntity;
|
||||||
|
}
|
||||||
|
|
||||||
public Set<String> getAccessibleNodeLabels() {
|
public Set<String> getAccessibleNodeLabels() {
|
||||||
return accessibleLabels;
|
return queueNodeLabelsSettings.getAccessibleNodeLabels();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -331,7 +318,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getDefaultNodeLabelExpression() {
|
public String getDefaultNodeLabelExpression() {
|
||||||
return defaultLabelExpression;
|
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupQueueConfigs(Resource clusterResource,
|
protected void setupQueueConfigs(Resource clusterResource,
|
||||||
|
@ -345,7 +332,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect and set the Node label configuration
|
// Collect and set the Node label configuration
|
||||||
initializeNodeLabels(configuration);
|
this.queueNodeLabelsSettings = new QueueNodeLabelsSettings(configuration, parent,
|
||||||
|
getQueuePath(), csContext);
|
||||||
|
|
||||||
// Initialize the queue capacities
|
// Initialize the queue capacities
|
||||||
setupConfigurableCapacities(configuration);
|
setupConfigurableCapacities(configuration);
|
||||||
|
@ -363,7 +351,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
// Setup queue's maximumAllocation respecting the global
|
// Setup queue's maximumAllocation respecting the global
|
||||||
// and the queue settings
|
// and the queue settings
|
||||||
setupMaximumAllocation(configuration);
|
this.queueAllocationSettings.setupMaximumAllocation(configuration, getQueuePath(),
|
||||||
|
parent, csContext);
|
||||||
|
|
||||||
// Initialize the queue state based on previous state, configured state
|
// Initialize the queue state based on previous state, configured state
|
||||||
// and its parent state
|
// and its parent state
|
||||||
|
@ -378,14 +367,15 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
this.reservationsContinueLooking =
|
this.reservationsContinueLooking =
|
||||||
configuration.getReservationContinueLook();
|
configuration.getReservationContinueLook();
|
||||||
this.configuredCapacityVectors = csContext.getConfiguration()
|
this.configuredCapacityVectors = csContext.getConfiguration()
|
||||||
.parseConfiguredResourceVector(queuePath, configuredNodeLabels);
|
.parseConfiguredResourceVector(queuePath.getFullPath(),
|
||||||
|
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
|
|
||||||
// Update metrics
|
// Update metrics
|
||||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||||
this, labelManager, null);
|
this, labelManager, null);
|
||||||
|
|
||||||
// Store preemption settings
|
// Store preemption settings
|
||||||
this.preemptionSettings = new CSQueuePreemption(this, csContext, configuration);
|
this.preemptionSettings = new CSQueuePreemptionSettings(this, csContext, configuration);
|
||||||
this.priority = configuration.getQueuePriority(
|
this.priority = configuration.getQueuePriority(
|
||||||
getQueuePath());
|
getQueuePath());
|
||||||
|
|
||||||
|
@ -394,7 +384,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
|
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
|
||||||
|
|
||||||
// Setup application related limits
|
// Setup application related limits
|
||||||
setupApplicationLimits(configuration);
|
this.queueAppLifetimeSettings = new QueueAppLifetimeAndLimitSettings(configuration,
|
||||||
|
this, getQueuePath());
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -407,11 +398,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
protected void setDynamicQueueProperties(
|
protected void setDynamicQueueProperties(
|
||||||
CapacitySchedulerConfiguration configuration) {
|
CapacitySchedulerConfiguration configuration) {
|
||||||
// Set properties from parent template
|
// Set properties from parent template
|
||||||
if (getParent() instanceof ParentQueue) {
|
if (parent instanceof ParentQueue) {
|
||||||
((ParentQueue) getParent()).getAutoCreatedQueueTemplate()
|
((ParentQueue) parent).getAutoCreatedQueueTemplate()
|
||||||
.setTemplateEntriesForChild(configuration, getQueuePath());
|
.setTemplateEntriesForChild(configuration, getQueuePath());
|
||||||
|
|
||||||
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
|
String parentTemplate = String.format("%s.%s", parent.getQueuePath(),
|
||||||
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
|
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
|
||||||
parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
|
parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
|
||||||
DOT));
|
DOT));
|
||||||
|
@ -421,135 +412,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
|
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
|
||||||
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||||
.setLabelsByQueue(queuePath, new HashSet<>(parentNodeLabels));
|
.setLabelsByQueue(getQueuePath(), new HashSet<>(parentNodeLabels));
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void initializeNodeLabels(
|
|
||||||
CapacitySchedulerConfiguration configuration) throws IOException {
|
|
||||||
// Collect and store labels
|
|
||||||
this.accessibleLabels =
|
|
||||||
configuration.getAccessibleNodeLabels(getQueuePath());
|
|
||||||
this.defaultLabelExpression =
|
|
||||||
configuration.getDefaultNodeLabelExpression(
|
|
||||||
getQueuePath());
|
|
||||||
|
|
||||||
// Inherit labels from parent if not set
|
|
||||||
if (this.accessibleLabels == null && parent != null) {
|
|
||||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the accessible labels is not null and the queue has a parent with a
|
|
||||||
// similar set of labels copy the defaultNodeLabelExpression from the parent
|
|
||||||
if (this.accessibleLabels != null && parent != null
|
|
||||||
&& this.defaultLabelExpression == null &&
|
|
||||||
this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
|
|
||||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (csContext.getCapacitySchedulerQueueManager() != null
|
|
||||||
&& csContext.getCapacitySchedulerQueueManager()
|
|
||||||
.getConfiguredNodeLabels() != null) {
|
|
||||||
if (getQueuePath().equals(ROOT)) {
|
|
||||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
|
||||||
.getConfiguredNodeLabels().getAllConfiguredLabels();
|
|
||||||
} else {
|
|
||||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
|
||||||
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Fallback to suboptimal but correct logic
|
|
||||||
this.configuredNodeLabels = csContext.getConfiguration()
|
|
||||||
.getConfiguredNodeLabels(queuePath);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate the initialized settings
|
|
||||||
validateNodeLabels();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateNodeLabels() throws IOException {
|
|
||||||
// Check if labels of this queue is a subset of parent queue, only do this
|
|
||||||
// when the queue in question is not root
|
|
||||||
if (parent != null && parent.getParent() != null) {
|
|
||||||
if (parent.getAccessibleNodeLabels() != null && !parent
|
|
||||||
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
|
||||||
// if parent isn't "*", child shouldn't be "*" too
|
|
||||||
if (this.getAccessibleNodeLabels().contains(
|
|
||||||
RMNodeLabelsManager.ANY)) {
|
|
||||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
|
||||||
+ "but child's accessible queue is *");
|
|
||||||
} else{
|
|
||||||
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
|
||||||
parent.getAccessibleNodeLabels());
|
|
||||||
if (!diff.isEmpty()) {
|
|
||||||
throw new IOException(
|
|
||||||
"Some labels of child queue is not a subset "
|
|
||||||
+ "of parent queue, these labels=[" + StringUtils
|
|
||||||
.join(diff, ",") + "]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupApplicationLimits(CapacitySchedulerConfiguration configuration) {
|
|
||||||
// Store max parallel apps property
|
|
||||||
this.maxParallelApps = configuration.getMaxParallelAppsForQueue(getQueuePath());
|
|
||||||
|
|
||||||
maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration);
|
|
||||||
defaultApplicationLifetime =
|
|
||||||
getInheritedDefaultAppLifetime(this, configuration,
|
|
||||||
maxApplicationLifetime);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
|
|
||||||
String myQueuePath = getQueuePath();
|
|
||||||
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
|
|
||||||
* object is a cloned one containing only the template configs
|
|
||||||
* (see ManagedParentQueue#getLeafQueueConfigs). To ensure that the actual
|
|
||||||
* cluster maximum allocation is fetched the original config object should
|
|
||||||
* be used.
|
|
||||||
*/
|
|
||||||
Resource clusterMax = ResourceUtils
|
|
||||||
.fetchMaximumAllocationFromConfig(this.csContext.getConfiguration());
|
|
||||||
Resource queueMax = csConf.getQueueMaximumAllocation(myQueuePath);
|
|
||||||
|
|
||||||
maximumAllocation = Resources.clone(
|
|
||||||
parent == null ? clusterMax : parent.getMaximumAllocation());
|
|
||||||
|
|
||||||
String errMsg =
|
|
||||||
"Queue maximum allocation cannot be larger than the cluster setting"
|
|
||||||
+ " for queue " + myQueuePath
|
|
||||||
+ " max allocation per queue: %s"
|
|
||||||
+ " cluster setting: " + clusterMax;
|
|
||||||
|
|
||||||
if (queueMax == Resources.none()) {
|
|
||||||
// Handle backward compatibility
|
|
||||||
long queueMemory = csConf.getQueueMaximumAllocationMb(myQueuePath);
|
|
||||||
int queueVcores = csConf.getQueueMaximumAllocationVcores(myQueuePath);
|
|
||||||
if (queueMemory != UNDEFINED) {
|
|
||||||
maximumAllocation.setMemorySize(queueMemory);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queueVcores != UNDEFINED) {
|
|
||||||
maximumAllocation.setVirtualCores(queueVcores);
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize()
|
|
||||||
|| (queueVcores != UNDEFINED
|
|
||||||
&& queueVcores > clusterMax.getVirtualCores()))) {
|
|
||||||
throw new IllegalArgumentException(
|
|
||||||
String.format(errMsg, maximumAllocation));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Queue level maximum-allocation can't be larger than cluster setting
|
|
||||||
for (ResourceInformation ri : queueMax.getResources()) {
|
|
||||||
if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0) {
|
|
||||||
throw new IllegalArgumentException(String.format(errMsg, queueMax));
|
|
||||||
}
|
|
||||||
|
|
||||||
maximumAllocation.setResourceInformation(ri.getName(), ri);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -557,7 +420,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
private UserWeights getUserWeightsFromHierarchy(
|
private UserWeights getUserWeightsFromHierarchy(
|
||||||
CapacitySchedulerConfiguration configuration) {
|
CapacitySchedulerConfiguration configuration) {
|
||||||
UserWeights unionInheritedWeights = UserWeights.createEmpty();
|
UserWeights unionInheritedWeights = UserWeights.createEmpty();
|
||||||
CSQueue parentQ = getParent();
|
CSQueue parentQ = parent;
|
||||||
if (parentQ != null) {
|
if (parentQ != null) {
|
||||||
// Inherit all of parent's userWeights
|
// Inherit all of parent's userWeights
|
||||||
unionInheritedWeights.addFrom(parentQ.getUserWeights());
|
unionInheritedWeights.addFrom(parentQ.getUserWeights());
|
||||||
|
@ -589,12 +452,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
protected void updateCapacityConfigType() {
|
protected void updateCapacityConfigType() {
|
||||||
this.capacityConfigType = CapacityConfigType.NONE;
|
this.capacityConfigType = CapacityConfigType.NONE;
|
||||||
for (String label : configuredNodeLabels) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
LOG.debug("capacityConfigType is '{}' for queue {}",
|
LOG.debug("capacityConfigType is '{}' for queue {}",
|
||||||
capacityConfigType, getQueuePath());
|
capacityConfigType, getQueuePath());
|
||||||
|
|
||||||
CapacityConfigType localType = checkConfigTypeIsAbsoluteResource(
|
CapacityConfigType localType = checkConfigTypeIsAbsoluteResource(
|
||||||
queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE
|
getQueuePath(), label) ? CapacityConfigType.ABSOLUTE_RESOURCE
|
||||||
: CapacityConfigType.PERCENTAGE;
|
: CapacityConfigType.PERCENTAGE;
|
||||||
|
|
||||||
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
|
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
|
||||||
|
@ -608,12 +471,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void updateConfigurableResourceLimits(Resource clusterResource) {
|
protected void updateConfigurableResourceLimits(Resource clusterResource) {
|
||||||
for (String label : configuredNodeLabels) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
final Resource minResource = getMinimumAbsoluteResource(queuePath, label);
|
final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
|
||||||
Resource maxResource = getMaximumAbsoluteResource(queuePath, label);
|
Resource maxResource = getMaximumAbsoluteResource(getQueuePath(), label);
|
||||||
|
|
||||||
if (parent != null) {
|
if (parent != null) {
|
||||||
final Resource parentMax = parent.getQueueResourceQuotas().getConfiguredMaxResource(label);
|
final Resource parentMax = parent.getQueueResourceQuotas()
|
||||||
|
.getConfiguredMaxResource(label);
|
||||||
validateMinResourceIsNotGreaterThanMaxResource(maxResource, parentMax, clusterResource,
|
validateMinResourceIsNotGreaterThanMaxResource(maxResource, parentMax, clusterResource,
|
||||||
"Max resource configuration "
|
"Max resource configuration "
|
||||||
+ maxResource + " is greater than parents max value:"
|
+ maxResource + " is greater than parents max value:"
|
||||||
|
@ -654,7 +518,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
private void validateAbsoluteVsPercentageCapacityConfig(
|
private void validateAbsoluteVsPercentageCapacityConfig(
|
||||||
CapacityConfigType localType) {
|
CapacityConfigType localType) {
|
||||||
if (!queuePath.equals("root")
|
if (!getQueuePath().equals("root")
|
||||||
&& !this.capacityConfigType.equals(localType)) {
|
&& !this.capacityConfigType.equals(localType)) {
|
||||||
throw new IllegalArgumentException("Queue '" + getQueuePath()
|
throw new IllegalArgumentException("Queue '" + getQueuePath()
|
||||||
+ "' should use either percentage based capacity"
|
+ "' should use either percentage based capacity"
|
||||||
|
@ -677,7 +541,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public Resource getEffectiveCapacityDown(String label, Resource factor) {
|
public Resource getEffectiveCapacityDown(String label, Resource factor) {
|
||||||
return Resources.normalizeDown(resourceCalculator,
|
return Resources.normalizeDown(resourceCalculator,
|
||||||
getQueueResourceQuotas().getEffectiveMinResource(label),
|
getQueueResourceQuotas().getEffectiveMinResource(label),
|
||||||
minimumAllocation);
|
queueAllocationSettings.getMinimumAllocation());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -690,7 +554,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
|
public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
|
||||||
return Resources.normalizeDown(resourceCalculator,
|
return Resources.normalizeDown(resourceCalculator,
|
||||||
getQueueResourceQuotas().getEffectiveMaxResource(label),
|
getQueueResourceQuotas().getEffectiveMaxResource(label),
|
||||||
minimumAllocation);
|
queueAllocationSettings.getMinimumAllocation());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -729,7 +593,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
&& parentState != QueueState.RUNNING) {
|
&& parentState != QueueState.RUNNING) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"The parent queue:" + parent.getQueuePath()
|
"The parent queue:" + parent.getQueuePath()
|
||||||
+ " cannot be STOPPED as the child queue:" + queuePath
|
+ " cannot be STOPPED as the child queue:" + getQueuePath()
|
||||||
+ " is in RUNNING state.");
|
+ " is in RUNNING state.");
|
||||||
} else {
|
} else {
|
||||||
updateQueueState(configuredState);
|
updateQueueState(configuredState);
|
||||||
|
@ -760,20 +624,20 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
// TODO, improve this
|
// TODO, improve this
|
||||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||||
queueInfo.setQueueName(queueName);
|
queueInfo.setQueueName(queueName);
|
||||||
queueInfo.setQueuePath(queuePath);
|
queueInfo.setQueuePath(getQueuePath());
|
||||||
queueInfo.setAccessibleNodeLabels(accessibleLabels);
|
queueInfo.setAccessibleNodeLabels(queueNodeLabelsSettings.getAccessibleNodeLabels());
|
||||||
queueInfo.setCapacity(queueCapacities.getCapacity());
|
queueInfo.setCapacity(queueCapacities.getCapacity());
|
||||||
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
|
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
|
||||||
queueInfo.setQueueState(getState());
|
queueInfo.setQueueState(getState());
|
||||||
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
|
queueInfo.setDefaultNodeLabelExpression(queueNodeLabelsSettings.getDefaultLabelExpression());
|
||||||
queueInfo.setCurrentCapacity(getUsedCapacity());
|
queueInfo.setCurrentCapacity(getUsedCapacity());
|
||||||
queueInfo.setQueueStatistics(getQueueStatistics());
|
queueInfo.setQueueStatistics(getQueueStatistics());
|
||||||
queueInfo.setPreemptionDisabled(getPreemptionDisabled());
|
queueInfo.setPreemptionDisabled(preemptionSettings.isPreemptionDisabled());
|
||||||
queueInfo.setIntraQueuePreemptionDisabled(
|
queueInfo.setIntraQueuePreemptionDisabled(
|
||||||
getIntraQueuePreemptionDisabled());
|
getIntraQueuePreemptionDisabled());
|
||||||
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
queueInfo.setQueueConfigurations(getQueueConfigurations());
|
||||||
queueInfo.setWeight(queueCapacities.getWeight());
|
queueInfo.setWeight(queueCapacities.getWeight());
|
||||||
queueInfo.setMaxParallelApps(maxParallelApps);
|
queueInfo.setMaxParallelApps(queueAppLifetimeSettings.getMaxParallelApps());
|
||||||
return queueInfo;
|
return queueInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -839,12 +703,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public Resource getMaximumAllocation() {
|
public Resource getMaximumAllocation() {
|
||||||
return maximumAllocation;
|
return queueAllocationSettings.getMaximumAllocation();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public Resource getMinimumAllocation() {
|
public Resource getMinimumAllocation() {
|
||||||
return minimumAllocation;
|
return queueAllocationSettings.getMinimumAllocation();
|
||||||
}
|
}
|
||||||
|
|
||||||
void allocateResource(Resource clusterResource,
|
void allocateResource(Resource clusterResource,
|
||||||
|
@ -897,8 +761,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public boolean getIntraQueuePreemptionDisabled() {
|
public boolean getIntraQueuePreemptionDisabled() {
|
||||||
return preemptionSettings.isIntraQueuePreemptionDisabledInHierarchy() ||
|
return preemptionSettings.getIntraQueuePreemptionDisabled();
|
||||||
preemptionSettings.isPreemptionDisabled();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -926,76 +789,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return readLock;
|
return readLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getInheritedMaxAppLifetime(CSQueue q,
|
|
||||||
CapacitySchedulerConfiguration conf) {
|
|
||||||
CSQueue parentQ = q.getParent();
|
|
||||||
long maxAppLifetime = conf.getMaximumLifetimePerQueue(q.getQueuePath());
|
|
||||||
|
|
||||||
// If q is the root queue, then get max app lifetime from conf.
|
|
||||||
if (parentQ == null) {
|
|
||||||
return maxAppLifetime;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this is not the root queue, get this queue's max app lifetime
|
|
||||||
// from the conf. The parent's max app lifetime will be used if it's
|
|
||||||
// not set for this queue.
|
|
||||||
// A value of 0 will override the parent's value and means no max lifetime.
|
|
||||||
// A negative value means that the parent's max should be used.
|
|
||||||
long parentsMaxAppLifetime = getParent().getMaximumApplicationLifetime();
|
|
||||||
return (maxAppLifetime >= 0) ? maxAppLifetime : parentsMaxAppLifetime;
|
|
||||||
}
|
|
||||||
|
|
||||||
private long getInheritedDefaultAppLifetime(CSQueue q,
|
|
||||||
CapacitySchedulerConfiguration conf, long myMaxAppLifetime) {
|
|
||||||
CSQueue parentQ = q.getParent();
|
|
||||||
long defaultAppLifetime = conf.getDefaultLifetimePerQueue(getQueuePath());
|
|
||||||
defaultAppLifetimeWasSpecifiedInConfig =
|
|
||||||
(defaultAppLifetime >= 0
|
|
||||||
|| (parentQ != null &&
|
|
||||||
parentQ.getDefaultAppLifetimeWasSpecifiedInConfig()));
|
|
||||||
|
|
||||||
// If q is the root queue, then get default app lifetime from conf.
|
|
||||||
if (parentQ == null) {
|
|
||||||
return defaultAppLifetime;
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this is not the root queue, get the parent's default app lifetime. The
|
|
||||||
// parent's default app lifetime will be used if not set for this queue.
|
|
||||||
long parentsDefaultAppLifetime =
|
|
||||||
getParent().getDefaultApplicationLifetime();
|
|
||||||
|
|
||||||
// Negative value indicates default lifetime was not set at this level.
|
|
||||||
// If default lifetime was not set at this level, calculate it based on
|
|
||||||
// parent's default lifetime or current queue's max lifetime.
|
|
||||||
if (defaultAppLifetime < 0) {
|
|
||||||
// If default lifetime was not set at this level but was set somewhere in
|
|
||||||
// the parent's hierarchy, set default lifetime to parent queue's default
|
|
||||||
// only if parent queue's lifetime is less than current queue's max
|
|
||||||
// lifetime. Otherwise, use current queue's max lifetime value for its
|
|
||||||
// default lifetime.
|
|
||||||
if (defaultAppLifetimeWasSpecifiedInConfig) {
|
|
||||||
defaultAppLifetime =
|
|
||||||
Math.min(parentsDefaultAppLifetime, myMaxAppLifetime);
|
|
||||||
} else {
|
|
||||||
// Default app lifetime value was not set anywhere in this queue's
|
|
||||||
// hierarchy. Use current queue's max lifetime as its default.
|
|
||||||
defaultAppLifetime = myMaxAppLifetime;
|
|
||||||
}
|
|
||||||
} // else if >= 0, default lifetime was set at this level. Just use it.
|
|
||||||
|
|
||||||
if (myMaxAppLifetime > 0 &&
|
|
||||||
defaultAppLifetime > myMaxAppLifetime) {
|
|
||||||
throw new YarnRuntimeException(
|
|
||||||
"Default lifetime " + defaultAppLifetime
|
|
||||||
+ " can't exceed maximum lifetime " + myMaxAppLifetime);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (defaultAppLifetime <= 0) {
|
|
||||||
defaultAppLifetime = myMaxAppLifetime;
|
|
||||||
}
|
|
||||||
return defaultAppLifetime;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Resource getCurrentLimitResource(String nodePartition,
|
private Resource getCurrentLimitResource(String nodePartition,
|
||||||
Resource clusterResource, ResourceLimits currentResourceLimits,
|
Resource clusterResource, ResourceLimits currentResourceLimits,
|
||||||
SchedulingMode schedulingMode) {
|
SchedulingMode schedulingMode) {
|
||||||
|
@ -1201,25 +994,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
usageTracker.getQueueUsage(), nodePartition, cluster, schedulingMode);
|
usageTracker.getQueueUsage(), nodePartition, cluster, schedulingMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean accessibleToPartition(String nodePartition) {
|
|
||||||
// if queue's label is *, it can access any node
|
|
||||||
if (accessibleLabels != null
|
|
||||||
&& accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// any queue can access to a node without label
|
|
||||||
if (nodePartition == null
|
|
||||||
|| nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// a queue can access to a node only if it contains any label of the node
|
|
||||||
if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// sorry, you cannot access
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Priority getDefaultApplicationPriority() {
|
public Priority getDefaultApplicationPriority() {
|
||||||
// TODO add dummy implementation
|
// TODO add dummy implementation
|
||||||
|
@ -1341,7 +1115,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
LOG.info("The specified queue:" + getQueuePath()
|
LOG.info("The specified queue:" + getQueuePath()
|
||||||
+ " is already in the RUNNING state.");
|
+ " is already in the RUNNING state.");
|
||||||
} else {
|
} else {
|
||||||
CSQueue parentQueue = getParent();
|
CSQueue parentQueue = parent;
|
||||||
if (parentQueue == null || parentQueue.getState() == QueueState.RUNNING) {
|
if (parentQueue == null || parentQueue.getState() == QueueState.RUNNING) {
|
||||||
updateQueueState(QueueState.RUNNING);
|
updateQueueState(QueueState.RUNNING);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1384,8 +1158,8 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
updateQueueState(QueueState.DRAINING);
|
updateQueueState(QueueState.DRAINING);
|
||||||
}
|
}
|
||||||
LOG.info("Recover draining state for queue " + this.getQueuePath());
|
LOG.info("Recover draining state for queue " + this.getQueuePath());
|
||||||
if (getParent() != null && getParent().getState() == QueueState.STOPPED) {
|
if (parent != null && parent.getState() == QueueState.STOPPED) {
|
||||||
((AbstractCSQueue) getParent()).recoverDrainingState();
|
((AbstractCSQueue) parent).recoverDrainingState();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
|
@ -1402,24 +1176,24 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMaximumApplicationLifetime() {
|
public long getMaximumApplicationLifetime() {
|
||||||
return maxApplicationLifetime;
|
return queueAppLifetimeSettings.getMaxApplicationLifetime();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getDefaultApplicationLifetime() {
|
public long getDefaultApplicationLifetime() {
|
||||||
return defaultApplicationLifetime;
|
return queueAppLifetimeSettings.getDefaultApplicationLifetime();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
|
public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
|
||||||
return defaultAppLifetimeWasSpecifiedInConfig;
|
return queueAppLifetimeSettings.isDefaultAppLifetimeWasSpecifiedInConfig();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaxParallelApps(int maxParallelApps) {
|
public void setMaxParallelApps(int maxParallelApps) {
|
||||||
this.maxParallelApps = maxParallelApps;
|
this.queueAppLifetimeSettings.setMaxParallelApps(maxParallelApps);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMaxParallelApps() {
|
public int getMaxParallelApps() {
|
||||||
return maxParallelApps;
|
return this.queueAppLifetimeSettings.getMaxParallelApps();
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract int getNumRunnableApps();
|
abstract int getNumRunnableApps();
|
||||||
|
@ -1447,7 +1221,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
ret.setResourceValue(i,
|
ret.setResourceValue(i,
|
||||||
(long) (nResourceInformation.getValue() * ratio));
|
(long) (nResourceInformation.getValue() * ratio));
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Updating min resource for Queue: " + queuePath + " as " + ret
|
LOG.debug("Updating min resource for Queue: " + getQueuePath() + " as " + ret
|
||||||
.getResourceInformation(i) + ", Actual resource: "
|
.getResourceInformation(i) + ", Actual resource: "
|
||||||
+ nResourceInformation.getValue() + ", ratio: " + ratio);
|
+ nResourceInformation.getValue() + ", ratio: " + ratio);
|
||||||
}
|
}
|
||||||
|
@ -1513,7 +1287,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateEffectiveResources(Resource clusterResource) {
|
void updateEffectiveResources(Resource clusterResource) {
|
||||||
for (String label : configuredNodeLabels) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
Resource resourceByLabel = labelManager.getResourceByLabel(label,
|
Resource resourceByLabel = labelManager.getResourceByLabel(label,
|
||||||
clusterResource);
|
clusterResource);
|
||||||
Resource newEffectiveMinResource;
|
Resource newEffectiveMinResource;
|
||||||
|
@ -1549,7 +1323,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
newEffectiveMaxResource);
|
newEffectiveMaxResource);
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Updating queue:" + queuePath
|
LOG.debug("Updating queue:" + getQueuePath()
|
||||||
+ " with effective minimum resource=" + newEffectiveMinResource
|
+ " with effective minimum resource=" + newEffectiveMinResource
|
||||||
+ "and effective maximum resource="
|
+ "and effective maximum resource="
|
||||||
+ newEffectiveMaxResource);
|
+ newEffectiveMaxResource);
|
||||||
|
|
|
@ -18,13 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
public class CSQueuePreemption {
|
public class CSQueuePreemptionSettings {
|
||||||
private final boolean preemptionDisabled;
|
private final boolean preemptionDisabled;
|
||||||
// Indicates if the in-queue preemption setting is ever disabled within the
|
// Indicates if the in-queue preemption setting is ever disabled within the
|
||||||
// hierarchy of this queue.
|
// hierarchy of this queue.
|
||||||
private final boolean intraQueuePreemptionDisabledInHierarchy;
|
private final boolean intraQueuePreemptionDisabledInHierarchy;
|
||||||
|
|
||||||
public CSQueuePreemption(
|
public CSQueuePreemptionSettings(
|
||||||
CSQueue queue,
|
CSQueue queue,
|
||||||
CapacitySchedulerContext csContext,
|
CapacitySchedulerContext csContext,
|
||||||
CapacitySchedulerConfiguration configuration) {
|
CapacitySchedulerConfiguration configuration) {
|
||||||
|
@ -109,6 +109,10 @@ public class CSQueuePreemption {
|
||||||
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
|
parentQ.getIntraQueuePreemptionDisabledInHierarchy());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getIntraQueuePreemptionDisabled() {
|
||||||
|
return intraQueuePreemptionDisabledInHierarchy || preemptionDisabled;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isIntraQueuePreemptionDisabledInHierarchy() {
|
public boolean isIntraQueuePreemptionDisabledInHierarchy() {
|
||||||
return intraQueuePreemptionDisabledInHierarchy;
|
return intraQueuePreemptionDisabledInHierarchy;
|
||||||
}
|
}
|
|
@ -215,15 +215,14 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
priorityAcls = conf.getPriorityAcls(getQueuePath(),
|
priorityAcls = conf.getPriorityAcls(getQueuePath(),
|
||||||
csContext.getMaxClusterLevelAppPriority());
|
csContext.getMaxClusterLevelAppPriority());
|
||||||
|
|
||||||
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
|
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
|
||||||
this.defaultLabelExpression, null)) {
|
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
|
||||||
|
this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"Invalid default label expression of " + " queue=" + getQueuePath()
|
"Invalid default label expression of " + " queue=" + getQueuePath()
|
||||||
+ " doesn't have permission to access all labels "
|
+ " doesn't have permission to access all labels "
|
||||||
+ "in default label expression. labelExpression of resource request="
|
+ "in default label expression. labelExpression of resource request="
|
||||||
+ (this.defaultLabelExpression == null ?
|
+ getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
|
||||||
"" :
|
|
||||||
this.defaultLabelExpression) + ". Queue labels=" + (
|
|
||||||
getAccessibleNodeLabels() == null ?
|
getAccessibleNodeLabels() == null ?
|
||||||
"" :
|
"" :
|
||||||
StringUtils
|
StringUtils
|
||||||
|
@ -238,8 +237,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// re-init this since max allocation could have changed
|
// re-init this since max allocation could have changed
|
||||||
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
|
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
|
||||||
Resources.subtract(maximumAllocation, minimumAllocation),
|
Resources.subtract(
|
||||||
maximumAllocation);
|
queueAllocationSettings.getMaximumAllocation(),
|
||||||
|
queueAllocationSettings.getMinimumAllocation()),
|
||||||
|
queueAllocationSettings.getMaximumAllocation());
|
||||||
|
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
|
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
|
||||||
|
@ -247,10 +248,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
StringBuilder labelStrBuilder = new StringBuilder();
|
StringBuilder labelStrBuilder = new StringBuilder();
|
||||||
if (accessibleLabels != null) {
|
if (accessibleNodeLabels != null) {
|
||||||
for (String s : accessibleLabels) {
|
for (String nodeLabel : accessibleNodeLabels) {
|
||||||
labelStrBuilder.append(s)
|
labelStrBuilder.append(nodeLabel).append(",");
|
||||||
.append(",");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -297,7 +297,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ "minimumAllocationFactor = " + minimumAllocationFactor
|
+ "minimumAllocationFactor = " + minimumAllocationFactor
|
||||||
+ " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
|
+ " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
|
||||||
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
|
+ "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
|
||||||
+ maximumAllocation + " [= configuredMaxAllocation ]" + "\n"
|
+ queueAllocationSettings.getMaximumAllocation() +
|
||||||
|
" [= configuredMaxAllocation ]" + "\n"
|
||||||
+ "numContainers = " + usageTracker.getNumContainers()
|
+ "numContainers = " + usageTracker.getNumContainers()
|
||||||
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
|
+ " [= currentNumContainers ]" + "\n" + "state = " + getState()
|
||||||
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
|
+ " [= configuredState ]" + "\n" + "acls = " + aclsString
|
||||||
|
@ -318,6 +319,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getDefaultNodeLabelExpressionStr() {
|
||||||
|
String defaultLabelExpression = queueNodeLabelsSettings.getDefaultLabelExpression();
|
||||||
|
return defaultLabelExpression == null ? "" : defaultLabelExpression;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used only by tests.
|
* Used only by tests.
|
||||||
*/
|
*/
|
||||||
|
@ -602,7 +608,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
|
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
getParent().submitApplicationAttempt(application, userName);
|
parent.submitApplicationAttempt(application, userName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -616,10 +622,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Inform the parent queue
|
// Inform the parent queue
|
||||||
try {
|
try {
|
||||||
getParent().submitApplication(applicationId, userName, queue);
|
parent.submitApplication(applicationId, userName, queue);
|
||||||
} catch (AccessControlException ace) {
|
} catch (AccessControlException ace) {
|
||||||
LOG.info("Failed to submit application to parent-queue: " +
|
LOG.info("Failed to submit application to parent-queue: " +
|
||||||
getParent().getQueuePath(), ace);
|
parent.getQueuePath(), ace);
|
||||||
throw ace;
|
throw ace;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,10 +670,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
getParent().validateSubmitApplication(applicationId, userName, queue);
|
parent.validateSubmitApplication(applicationId, userName, queue);
|
||||||
} catch (AccessControlException ace) {
|
} catch (AccessControlException ace) {
|
||||||
LOG.info("Failed to submit application to parent-queue: " +
|
LOG.info("Failed to submit application to parent-queue: " +
|
||||||
getParent().getQueuePath(), ace);
|
parent.getQueuePath(), ace);
|
||||||
throw ace;
|
throw ace;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -714,6 +720,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
|
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
|
||||||
|
|
||||||
|
Resource minimumAllocation = queueAllocationSettings.getMinimumAllocation();
|
||||||
|
|
||||||
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
|
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
|
||||||
resourceCalculator, queuePartitionResource,
|
resourceCalculator, queuePartitionResource,
|
||||||
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
||||||
|
@ -800,7 +808,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
|
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
|
||||||
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
|
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
|
||||||
minimumAllocation);
|
queueAllocationSettings.getMinimumAllocation());
|
||||||
|
|
||||||
usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
|
usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
|
||||||
usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
|
usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
|
||||||
|
@ -987,14 +995,14 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
appFinished();
|
appFinished();
|
||||||
|
|
||||||
// Inform the parent queue
|
// Inform the parent queue
|
||||||
getParent().finishApplication(application, user);
|
parent.finishApplication(application, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
|
public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
|
||||||
// Careful! Locking order is important!
|
// Careful! Locking order is important!
|
||||||
removeApplicationAttempt(application, application.getUser());
|
removeApplicationAttempt(application, application.getUser());
|
||||||
getParent().finishApplicationAttempt(application, queue);
|
parent.finishApplicationAttempt(application, queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeApplicationAttempt(
|
private void removeApplicationAttempt(
|
||||||
|
@ -1165,9 +1173,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// if our queue cannot access this node, just return
|
// if our queue cannot access this node, just return
|
||||||
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
||||||
&& !accessibleToPartition(candidates.getPartition())) {
|
&& !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
|
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
|
||||||
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
|
ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
|
||||||
return CSAssignment.NULL_ASSIGNMENT;
|
return CSAssignment.NULL_ASSIGNMENT;
|
||||||
}
|
}
|
||||||
|
@ -1183,7 +1191,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
.getPartition());
|
.getPartition());
|
||||||
}
|
}
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
||||||
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
|
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
|
||||||
return CSAssignment.NULL_ASSIGNMENT;
|
return CSAssignment.NULL_ASSIGNMENT;
|
||||||
}
|
}
|
||||||
|
@ -1211,7 +1219,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
activitiesManager, node, application, application.getPriority(),
|
activitiesManager, node, application, application.getPriority(),
|
||||||
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
|
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(),
|
parent.getQueuePath(), getQueuePath(),
|
||||||
ActivityState.REJECTED,
|
ActivityState.REJECTED,
|
||||||
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
|
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
|
||||||
return CSAssignment.NULL_ASSIGNMENT;
|
return CSAssignment.NULL_ASSIGNMENT;
|
||||||
|
@ -1280,7 +1288,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
|
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
|
||||||
Resources.none())) {
|
Resources.none())) {
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(),
|
parent.getQueuePath(), getQueuePath(),
|
||||||
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
|
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
|
||||||
return assignment;
|
return assignment;
|
||||||
} else if (assignment.getSkippedType()
|
} else if (assignment.getSkippedType()
|
||||||
|
@ -1292,7 +1300,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
} else if (assignment.getSkippedType()
|
} else if (assignment.getSkippedType()
|
||||||
== CSAssignment.SkippedType.QUEUE_LIMIT) {
|
== CSAssignment.SkippedType.QUEUE_LIMIT) {
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(), ActivityState.REJECTED,
|
parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
|
||||||
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
|
() -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
|
||||||
+ " from " + application.getApplicationId());
|
+ " from " + application.getApplicationId());
|
||||||
return assignment;
|
return assignment;
|
||||||
|
@ -1300,7 +1308,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// If we don't allocate anything, and it is not skipped by application,
|
// If we don't allocate anything, and it is not skipped by application,
|
||||||
// we will return to respect FIFO of applications
|
// we will return to respect FIFO of applications
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
||||||
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
|
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
|
||||||
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
|
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
|
||||||
activitiesManager, application.getApplicationId(),
|
activitiesManager, application.getApplicationId(),
|
||||||
|
@ -1309,7 +1317,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
getParent().getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
|
||||||
ActivityDiagnosticConstant.EMPTY);
|
ActivityDiagnosticConstant.EMPTY);
|
||||||
|
|
||||||
return CSAssignment.NULL_ASSIGNMENT;
|
return CSAssignment.NULL_ASSIGNMENT;
|
||||||
|
@ -1516,7 +1524,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
usageTracker.getQueueUsage().getUsed(partition)));
|
usageTracker.getQueueUsage().getUsed(partition)));
|
||||||
// Normalize it before return
|
// Normalize it before return
|
||||||
headroom =
|
headroom =
|
||||||
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
|
Resources.roundDown(resourceCalculator, headroom,
|
||||||
|
queueAllocationSettings.getMinimumAllocation());
|
||||||
|
|
||||||
//headroom = min (unused resourcelimit of a label, calculated headroom )
|
//headroom = min (unused resourcelimit of a label, calculated headroom )
|
||||||
Resource clusterPartitionResource =
|
Resource clusterPartitionResource =
|
||||||
|
@ -1795,7 +1804,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
if (removed) {
|
if (removed) {
|
||||||
// Inform the parent queue _outside_ of the leaf-queue lock
|
// Inform the parent queue _outside_ of the leaf-queue lock
|
||||||
getParent().completedContainer(clusterResource, application, node,
|
parent.completedContainer(clusterResource, application, node,
|
||||||
rmContainer, null, event, this, sortQueues);
|
rmContainer, null, event, this, sortQueues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1922,7 +1931,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
this.cachedResourceLimitsForHeadroom =
|
this.cachedResourceLimitsForHeadroom =
|
||||||
new ResourceLimits(currentResourceLimits.getLimit());
|
new ResourceLimits(currentResourceLimits.getLimit());
|
||||||
Resource queueMaxResource = getEffectiveMaxCapacityDown(
|
Resource queueMaxResource = getEffectiveMaxCapacityDown(
|
||||||
RMNodeLabelsManager.NO_LABEL, minimumAllocation);
|
RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
|
||||||
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
|
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
|
||||||
resourceCalculator, clusterResource, queueMaxResource,
|
resourceCalculator, clusterResource, queueMaxResource,
|
||||||
currentResourceLimits.getLimit()));
|
currentResourceLimits.getLimit()));
|
||||||
|
@ -2033,7 +2042,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
getParent().recoverContainer(clusterResource, attempt, rmContainer);
|
parent.recoverContainer(clusterResource, attempt, rmContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2161,7 +2170,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||||
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
|
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
|
||||||
// Inform the parent queue
|
// Inform the parent queue
|
||||||
getParent().attachContainer(clusterResource, application, rmContainer);
|
parent.attachContainer(clusterResource, application, rmContainer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2181,7 +2190,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
|
||||||
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
|
+ usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
|
||||||
// Inform the parent queue
|
// Inform the parent queue
|
||||||
getParent().detachContainer(clusterResource, application, rmContainer);
|
parent.detachContainer(clusterResource, application, rmContainer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2341,7 +2350,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
|
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
|
||||||
maxAppsForQueue = baseMaxApplications;
|
maxAppsForQueue = baseMaxApplications;
|
||||||
} else {
|
} else {
|
||||||
for (String label : configuredNodeLabels) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
int maxApplicationsByLabel = (int) (baseMaxApplications
|
int maxApplicationsByLabel = (int) (baseMaxApplications
|
||||||
* queueCapacities.getAbsoluteCapacity(label));
|
* queueCapacities.getAbsoluteCapacity(label));
|
||||||
if (maxApplicationsByLabel > maxAppsForQueue) {
|
if (maxApplicationsByLabel > maxAppsForQueue) {
|
||||||
|
|
|
@ -169,10 +169,9 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
StringBuilder labelStrBuilder = new StringBuilder();
|
StringBuilder labelStrBuilder = new StringBuilder();
|
||||||
if (accessibleLabels != null) {
|
if (queueNodeLabelsSettings.getAccessibleNodeLabels() != null) {
|
||||||
for (String s : accessibleLabels) {
|
for (String nodeLabel : queueNodeLabelsSettings.getAccessibleNodeLabels()) {
|
||||||
labelStrBuilder.append(s)
|
labelStrBuilder.append(nodeLabel).append(",");
|
||||||
.append(",");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -757,7 +756,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
try {
|
try {
|
||||||
parent.submitApplication(applicationId, user, queue);
|
parent.submitApplication(applicationId, user, queue);
|
||||||
} catch (AccessControlException ace) {
|
} catch (AccessControlException ace) {
|
||||||
LOG.info("Failed to submit application to parent-queue: " +
|
LOG.info("Failed to submit application to parent-queue: " +
|
||||||
parent.getQueuePath(), ace);
|
parent.getQueuePath(), ace);
|
||||||
removeApplication(applicationId, user);
|
removeApplication(applicationId, user);
|
||||||
throw ace;
|
throw ace;
|
||||||
|
@ -846,7 +845,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getParentName() {
|
private String getParentName() {
|
||||||
return getParent() != null ? getParent().getQueuePath() : "";
|
return parent != null ? parent.getQueuePath() : "";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -857,7 +856,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// if our queue cannot access this node, just return
|
// if our queue cannot access this node, just return
|
||||||
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
|
||||||
&& !accessibleToPartition(candidates.getPartition())) {
|
&& !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
// Do logging every 1 sec to avoid excessive logging.
|
// Do logging every 1 sec to avoid excessive logging.
|
||||||
|
@ -1038,7 +1037,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
// 1) Node doesn't have reserved container
|
// 1) Node doesn't have reserved container
|
||||||
// 2) Node's available-resource + killable-resource should > 0
|
// 2) Node's available-resource + killable-resource should > 0
|
||||||
boolean accept = node.getReservedContainer() == null &&
|
boolean accept = node.getReservedContainer() == null &&
|
||||||
Resources.fitsIn(resourceCalculator, minimumAllocation,
|
Resources.fitsIn(resourceCalculator, queueAllocationSettings.getMinimumAllocation(),
|
||||||
Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
|
Resources.add(node.getUnallocatedResource(), node.getTotalKillableResources()));
|
||||||
if (!accept) {
|
if (!accept) {
|
||||||
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
||||||
|
@ -1085,7 +1084,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Normalize before return
|
// Normalize before return
|
||||||
childLimit =
|
childLimit =
|
||||||
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
|
Resources.roundDown(resourceCalculator, childLimit,
|
||||||
|
queueAllocationSettings.getMinimumAllocation());
|
||||||
|
|
||||||
return new ResourceLimits(childLimit);
|
return new ResourceLimits(childLimit);
|
||||||
}
|
}
|
||||||
|
@ -1270,7 +1270,7 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update effective capacity in all parent queue.
|
// Update effective capacity in all parent queue.
|
||||||
for (String label : configuredNodeLabels) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
calculateEffectiveResourcesAndCapacity(label, clusterResource);
|
calculateEffectiveResourcesAndCapacity(label, clusterResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,97 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class determines minimum and maximum allocation settings based on the
|
||||||
|
* {@link CapacitySchedulerConfiguration} and other queue
|
||||||
|
* properties.
|
||||||
|
**/
|
||||||
|
public class QueueAllocationSettings {
|
||||||
|
private final Resource minimumAllocation;
|
||||||
|
private Resource maximumAllocation;
|
||||||
|
|
||||||
|
public QueueAllocationSettings(CapacitySchedulerContext csContext) {
|
||||||
|
this.minimumAllocation = csContext.getMinimumResourceCapability();
|
||||||
|
}
|
||||||
|
|
||||||
|
void setupMaximumAllocation(CapacitySchedulerConfiguration csConf, String queuePath,
|
||||||
|
CSQueue parent, CapacitySchedulerContext csContext) {
|
||||||
|
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
|
||||||
|
* object is a cloned one containing only the template configs
|
||||||
|
* (see ManagedParentQueue#getLeafQueueConfigs). To ensure that the actual
|
||||||
|
* cluster maximum allocation is fetched the original config object should
|
||||||
|
* be used.
|
||||||
|
*/
|
||||||
|
Resource clusterMax = ResourceUtils
|
||||||
|
.fetchMaximumAllocationFromConfig(csContext.getConfiguration());
|
||||||
|
Resource queueMax = csConf.getQueueMaximumAllocation(queuePath);
|
||||||
|
|
||||||
|
maximumAllocation = Resources.clone(
|
||||||
|
parent == null ? clusterMax : parent.getMaximumAllocation());
|
||||||
|
|
||||||
|
String errMsg =
|
||||||
|
"Queue maximum allocation cannot be larger than the cluster setting"
|
||||||
|
+ " for queue " + queuePath
|
||||||
|
+ " max allocation per queue: %s"
|
||||||
|
+ " cluster setting: " + clusterMax;
|
||||||
|
|
||||||
|
if (queueMax == Resources.none()) {
|
||||||
|
// Handle backward compatibility
|
||||||
|
long queueMemory = csConf.getQueueMaximumAllocationMb(queuePath);
|
||||||
|
int queueVcores = csConf.getQueueMaximumAllocationVcores(queuePath);
|
||||||
|
if (queueMemory != UNDEFINED) {
|
||||||
|
maximumAllocation.setMemorySize(queueMemory);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queueVcores != UNDEFINED) {
|
||||||
|
maximumAllocation.setVirtualCores(queueVcores);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize()
|
||||||
|
|| (queueVcores != UNDEFINED
|
||||||
|
&& queueVcores > clusterMax.getVirtualCores()))) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
String.format(errMsg, maximumAllocation));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Queue level maximum-allocation can't be larger than cluster setting
|
||||||
|
for (ResourceInformation ri : queueMax.getResources()) {
|
||||||
|
if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0) {
|
||||||
|
throw new IllegalArgumentException(String.format(errMsg, queueMax));
|
||||||
|
}
|
||||||
|
|
||||||
|
maximumAllocation.setResourceInformation(ri.getName(), ri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getMinimumAllocation() {
|
||||||
|
return minimumAllocation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getMaximumAllocation() {
|
||||||
|
return maximumAllocation;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,132 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class determines application lifetime and max parallel apps settings based on the
|
||||||
|
* {@link CapacitySchedulerConfiguration} and other queue
|
||||||
|
* properties.
|
||||||
|
**/
|
||||||
|
public class QueueAppLifetimeAndLimitSettings {
|
||||||
|
// -1 indicates lifetime is disabled
|
||||||
|
private final long maxApplicationLifetime;
|
||||||
|
private final long defaultApplicationLifetime;
|
||||||
|
|
||||||
|
// Indicates if this queue's default lifetime was set by a config property,
|
||||||
|
// either at this level or anywhere in the queue's hierarchy.
|
||||||
|
private boolean defaultAppLifetimeWasSpecifiedInConfig = false;
|
||||||
|
|
||||||
|
private int maxParallelApps;
|
||||||
|
|
||||||
|
public QueueAppLifetimeAndLimitSettings(CapacitySchedulerConfiguration configuration,
|
||||||
|
AbstractCSQueue q, String queuePath) {
|
||||||
|
// Store max parallel apps property
|
||||||
|
this.maxParallelApps = configuration.getMaxParallelAppsForQueue(queuePath);
|
||||||
|
this.maxApplicationLifetime = getInheritedMaxAppLifetime(q, configuration);
|
||||||
|
this.defaultApplicationLifetime = setupInheritedDefaultAppLifetime(q, queuePath, configuration,
|
||||||
|
maxApplicationLifetime);
|
||||||
|
}
|
||||||
|
|
||||||
|
private long getInheritedMaxAppLifetime(CSQueue q, CapacitySchedulerConfiguration conf) {
|
||||||
|
CSQueue parentQ = q.getParent();
|
||||||
|
long maxAppLifetime = conf.getMaximumLifetimePerQueue(q.getQueuePath());
|
||||||
|
|
||||||
|
// If q is the root queue, then get max app lifetime from conf.
|
||||||
|
if (parentQ == null) {
|
||||||
|
return maxAppLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is not the root queue, get this queue's max app lifetime
|
||||||
|
// from the conf. The parent's max app lifetime will be used if it's
|
||||||
|
// not set for this queue.
|
||||||
|
// A value of 0 will override the parent's value and means no max lifetime.
|
||||||
|
// A negative value means that the parent's max should be used.
|
||||||
|
long parentsMaxAppLifetime = parentQ.getMaximumApplicationLifetime();
|
||||||
|
return (maxAppLifetime >= 0) ? maxAppLifetime : parentsMaxAppLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long setupInheritedDefaultAppLifetime(CSQueue q,
|
||||||
|
String queuePath, CapacitySchedulerConfiguration conf, long myMaxAppLifetime) {
|
||||||
|
CSQueue parentQ = q.getParent();
|
||||||
|
long defaultAppLifetime = conf.getDefaultLifetimePerQueue(queuePath);
|
||||||
|
defaultAppLifetimeWasSpecifiedInConfig =
|
||||||
|
(defaultAppLifetime >= 0
|
||||||
|
|| (parentQ != null &&
|
||||||
|
parentQ.getDefaultAppLifetimeWasSpecifiedInConfig()));
|
||||||
|
|
||||||
|
// If q is the root queue, then get default app lifetime from conf.
|
||||||
|
if (parentQ == null) {
|
||||||
|
return defaultAppLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is not the root queue, get the parent's default app lifetime. The
|
||||||
|
// parent's default app lifetime will be used if not set for this queue.
|
||||||
|
long parentsDefaultAppLifetime = parentQ.getDefaultApplicationLifetime();
|
||||||
|
|
||||||
|
// Negative value indicates default lifetime was not set at this level.
|
||||||
|
// If default lifetime was not set at this level, calculate it based on
|
||||||
|
// parent's default lifetime or current queue's max lifetime.
|
||||||
|
if (defaultAppLifetime < 0) {
|
||||||
|
// If default lifetime was not set at this level but was set somewhere in
|
||||||
|
// the parent's hierarchy, set default lifetime to parent queue's default
|
||||||
|
// only if parent queue's lifetime is less than current queue's max
|
||||||
|
// lifetime. Otherwise, use current queue's max lifetime value for its
|
||||||
|
// default lifetime.
|
||||||
|
if (defaultAppLifetimeWasSpecifiedInConfig) {
|
||||||
|
defaultAppLifetime =
|
||||||
|
Math.min(parentsDefaultAppLifetime, myMaxAppLifetime);
|
||||||
|
} else {
|
||||||
|
// Default app lifetime value was not set anywhere in this queue's
|
||||||
|
// hierarchy. Use current queue's max lifetime as its default.
|
||||||
|
defaultAppLifetime = myMaxAppLifetime;
|
||||||
|
}
|
||||||
|
} // else if >= 0, default lifetime was set at this level. Just use it.
|
||||||
|
|
||||||
|
if (myMaxAppLifetime > 0 && defaultAppLifetime > myMaxAppLifetime) {
|
||||||
|
throw new YarnRuntimeException(
|
||||||
|
"Default lifetime " + defaultAppLifetime
|
||||||
|
+ " can't exceed maximum lifetime " + myMaxAppLifetime);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (defaultAppLifetime <= 0) {
|
||||||
|
defaultAppLifetime = myMaxAppLifetime;
|
||||||
|
}
|
||||||
|
return defaultAppLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxParallelApps() {
|
||||||
|
return maxParallelApps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxParallelApps(int maxParallelApps) {
|
||||||
|
this.maxParallelApps = maxParallelApps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMaxApplicationLifetime() {
|
||||||
|
return maxApplicationLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDefaultApplicationLifetime() {
|
||||||
|
return defaultApplicationLifetime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDefaultAppLifetimeWasSpecifiedInConfig() {
|
||||||
|
return defaultAppLifetimeWasSpecifiedInConfig;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,147 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.util.Sets;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class determines accessible node labels, configured node labels and the default node
|
||||||
|
* label expression based on the {@link CapacitySchedulerConfiguration} object and other queue
|
||||||
|
* properties.
|
||||||
|
*/
|
||||||
|
public class QueueNodeLabelsSettings {
|
||||||
|
private final CSQueue parent;
|
||||||
|
private final String queuePath;
|
||||||
|
private final CapacitySchedulerContext csContext;
|
||||||
|
private Set<String> accessibleLabels;
|
||||||
|
private Set<String> configuredNodeLabels;
|
||||||
|
private String defaultLabelExpression;
|
||||||
|
|
||||||
|
public QueueNodeLabelsSettings(CapacitySchedulerConfiguration configuration,
|
||||||
|
CSQueue parent,
|
||||||
|
String queuePath,
|
||||||
|
CapacitySchedulerContext csContext) throws IOException {
|
||||||
|
this.parent = parent;
|
||||||
|
this.queuePath = queuePath;
|
||||||
|
this.csContext = csContext;
|
||||||
|
initializeNodeLabels(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeNodeLabels(CapacitySchedulerConfiguration configuration)
|
||||||
|
throws IOException {
|
||||||
|
initializeAccessibleLabels(configuration);
|
||||||
|
initializeDefaultLabelExpression(configuration);
|
||||||
|
initializeConfiguredNodeLabels();
|
||||||
|
validateNodeLabels();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeAccessibleLabels(CapacitySchedulerConfiguration configuration) {
|
||||||
|
this.accessibleLabels = configuration.getAccessibleNodeLabels(queuePath);
|
||||||
|
// Inherit labels from parent if not set
|
||||||
|
if (this.accessibleLabels == null && parent != null) {
|
||||||
|
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeDefaultLabelExpression(CapacitySchedulerConfiguration configuration) {
|
||||||
|
this.defaultLabelExpression = configuration.getDefaultNodeLabelExpression(queuePath);
|
||||||
|
// If the accessible labels is not null and the queue has a parent with a
|
||||||
|
// similar set of labels copy the defaultNodeLabelExpression from the parent
|
||||||
|
if (this.accessibleLabels != null && parent != null
|
||||||
|
&& this.defaultLabelExpression == null &&
|
||||||
|
this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
|
||||||
|
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeConfiguredNodeLabels() {
|
||||||
|
if (csContext.getCapacitySchedulerQueueManager() != null
|
||||||
|
&& csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels() != null) {
|
||||||
|
if (queuePath.equals(ROOT)) {
|
||||||
|
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||||
|
.getConfiguredNodeLabels().getAllConfiguredLabels();
|
||||||
|
} else {
|
||||||
|
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||||
|
.getConfiguredNodeLabels().getLabelsByQueue(queuePath);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Fallback to suboptimal but correct logic
|
||||||
|
this.configuredNodeLabels = csContext.getConfiguration().getConfiguredNodeLabels(queuePath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateNodeLabels() throws IOException {
|
||||||
|
// Check if labels of this queue is a subset of parent queue, only do this
|
||||||
|
// when the queue in question is not root
|
||||||
|
if (isNotRoot()) {
|
||||||
|
if (parent.getAccessibleNodeLabels() != null && !parent
|
||||||
|
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||||
|
// If parent isn't "*", child shouldn't be "*" too
|
||||||
|
if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||||
|
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||||
|
+ "but child's accessible queue is " + RMNodeLabelsManager.ANY);
|
||||||
|
} else {
|
||||||
|
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
||||||
|
parent.getAccessibleNodeLabels());
|
||||||
|
if (!diff.isEmpty()) {
|
||||||
|
throw new IOException(String.format(
|
||||||
|
"Some labels of child queue is not a subset of parent queue, these labels=[%s]",
|
||||||
|
StringUtils.join(diff, ",")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isNotRoot() {
|
||||||
|
return parent != null && parent.getParent() != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAccessibleToPartition(String nodePartition) {
|
||||||
|
// if queue's label is *, it can access any node
|
||||||
|
if (accessibleLabels != null && accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// any queue can access to a node without label
|
||||||
|
if (nodePartition == null || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// a queue can access to a node only if it contains any label of the node
|
||||||
|
if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// The partition cannot be accessed
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getAccessibleNodeLabels() {
|
||||||
|
return accessibleLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getConfiguredNodeLabels() {
|
||||||
|
return configuredNodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDefaultLabelExpression() {
|
||||||
|
return defaultLabelExpression;
|
||||||
|
}
|
||||||
|
}
|
|
@ -5261,7 +5261,7 @@ public class TestLeafQueue {
|
||||||
ParentQueue rootQueue = (ParentQueue) cs.getRootQueue();
|
ParentQueue rootQueue = (ParentQueue) cs.getRootQueue();
|
||||||
|
|
||||||
Assert.assertEquals(Sets.newHashSet("", "test", "test2"),
|
Assert.assertEquals(Sets.newHashSet("", "test", "test2"),
|
||||||
rootQueue.configuredNodeLabels);
|
rootQueue.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
|
Loading…
Reference in New Issue