YARN-10945. Add javadoc to all methods of AbstractCSQueue. Contributed by Andras Gyori
This commit is contained in:
parent
56d807175d
commit
2ece95064b
|
@ -77,6 +77,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
|
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides implementation of {@code CSQueue} methods common for every queue class in Capacity
|
||||||
|
* Scheduler.
|
||||||
|
*/
|
||||||
public abstract class AbstractCSQueue implements CSQueue {
|
public abstract class AbstractCSQueue implements CSQueue {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractCSQueue.class);
|
LoggerFactory.getLogger(AbstractCSQueue.class);
|
||||||
|
@ -106,11 +110,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
CSQueueUsageTracker usageTracker;
|
CSQueueUsageTracker usageTracker;
|
||||||
|
|
||||||
public enum CapacityConfigType {
|
public enum CapacityConfigType {
|
||||||
// FIXME, from what I can see, Percentage mode can almost apply to weighted
|
|
||||||
// and percentage mode at the same time, there's only small area need to be
|
|
||||||
// changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT"
|
|
||||||
NONE, PERCENTAGE, ABSOLUTE_RESOURCE
|
NONE, PERCENTAGE, ABSOLUTE_RESOURCE
|
||||||
};
|
};
|
||||||
|
|
||||||
protected CapacityConfigType capacityConfigType =
|
protected CapacityConfigType capacityConfigType =
|
||||||
CapacityConfigType.NONE;
|
CapacityConfigType.NONE;
|
||||||
|
|
||||||
|
@ -175,6 +177,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return new QueuePath(parent.getQueuePath(), queueName);
|
return new QueuePath(parent.getQueuePath(), queueName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets up capacity and weight values from configuration.
|
||||||
|
*/
|
||||||
protected void setupConfigurableCapacities() {
|
protected void setupConfigurableCapacities() {
|
||||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
|
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
|
||||||
queueContext.getConfiguration(), this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
queueContext.getConfiguration(), this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||||
|
@ -272,6 +277,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return queueNodeLabelsSettings.getAccessibleNodeLabels();
|
return queueNodeLabelsSettings.getAccessibleNodeLabels();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether the user has the required permission to execute the action of {@code QueueACL}.
|
||||||
|
* @param acl the access type the user is checked for
|
||||||
|
* @param user UGI of the user
|
||||||
|
* @return true, if the user has permission, false otherwise
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
|
||||||
return authorizer.checkPermission(
|
return authorizer.checkPermission(
|
||||||
|
@ -319,6 +330,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
|
return this.queueNodeLabelsSettings.getDefaultLabelExpression();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize queue properties that are based on configuration.
|
||||||
|
* @param clusterResource overall resource of the cluster
|
||||||
|
* @throws IOException if configuration is set in a way that is inconsistent
|
||||||
|
*/
|
||||||
protected void setupQueueConfigs(Resource clusterResource) throws
|
protected void setupQueueConfigs(Resource clusterResource) throws
|
||||||
IOException {
|
IOException {
|
||||||
|
|
||||||
|
@ -460,6 +476,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initializes configured minimum and maximum capacity from configuration, if capacity is defined
|
||||||
|
* in ABSOLUTE node.
|
||||||
|
* @param clusterResource overall resource of the cluster
|
||||||
|
*/
|
||||||
protected void updateConfigurableResourceLimits(Resource clusterResource) {
|
protected void updateConfigurableResourceLimits(Resource clusterResource) {
|
||||||
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
|
||||||
final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
|
final Resource minResource = getMinimumAbsoluteResource(getQueuePath(), label);
|
||||||
|
@ -647,6 +668,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return queueAllocationSettings.getMinimumAllocation();
|
return queueAllocationSettings.getMinimumAllocation();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments resource usage of the queue and all related statistics and metrics that depends on
|
||||||
|
* it.
|
||||||
|
* @param clusterResource overall cluster resource
|
||||||
|
* @param resource resource amount to increment
|
||||||
|
* @param nodePartition node label
|
||||||
|
*/
|
||||||
void allocateResource(Resource clusterResource,
|
void allocateResource(Resource clusterResource,
|
||||||
Resource resource, String nodePartition) {
|
Resource resource, String nodePartition) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -660,6 +688,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrements resource usage of the queue and all related statistics and metrics that depends on
|
||||||
|
* it.
|
||||||
|
* @param clusterResource overall cluster resource
|
||||||
|
* @param resource resource amount to decrement
|
||||||
|
* @param nodePartition node label
|
||||||
|
*/
|
||||||
protected void releaseResource(Resource clusterResource,
|
protected void releaseResource(Resource clusterResource,
|
||||||
Resource resource, String nodePartition) {
|
Resource resource, String nodePartition) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -675,6 +710,10 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether we should continue to look at all heart beating nodes even
|
||||||
|
* after the reservation limit was hit.
|
||||||
|
*/
|
||||||
@Private
|
@Private
|
||||||
public boolean isReservationsContinueLooking() {
|
public boolean isReservationsContinueLooking() {
|
||||||
return reservationsContinueLooking;
|
return reservationsContinueLooking;
|
||||||
|
@ -759,6 +798,15 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return childQueues != null && !childQueues.isEmpty();
|
return childQueues != null && !childQueues.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether this queue has remaining resources left for further container assigment.
|
||||||
|
* @param clusterResource overall cluster resource
|
||||||
|
* @param nodePartition node label
|
||||||
|
* @param currentResourceLimits limit of the queue imposed by its maximum capacity
|
||||||
|
* @param resourceCouldBeUnreserved reserved resource that could potentially be unreserved
|
||||||
|
* @param schedulingMode scheduling strategy to handle node labels
|
||||||
|
* @return true if queue has remaining free resource, false otherwise
|
||||||
|
*/
|
||||||
boolean canAssignToThisQueue(Resource clusterResource,
|
boolean canAssignToThisQueue(Resource clusterResource,
|
||||||
String nodePartition, ResourceLimits currentResourceLimits,
|
String nodePartition, ResourceLimits currentResourceLimits,
|
||||||
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
|
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
|
||||||
|
@ -932,10 +980,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Priority getDefaultApplicationPriority() {
|
public Priority getDefaultApplicationPriority() {
|
||||||
// TODO add dummy implementation
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the union of all node labels that could be accessed by this queue based on accessible
|
||||||
|
* node labels and configured node labels properties.
|
||||||
|
* @return node labels this queue has access to
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getNodeLabelsForQueue() {
|
public Set<String> getNodeLabelsForQueue() {
|
||||||
// if queue's label is *, queue can access any labels. Instead of
|
// if queue's label is *, queue can access any labels. Instead of
|
||||||
|
@ -978,6 +1030,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
resourceLimits, schedulingMode);
|
resourceLimits, schedulingMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether this queue could accept the container allocation request.
|
||||||
|
* @param cluster overall cluster resource
|
||||||
|
* @param request container allocation request
|
||||||
|
* @return true if queue could accept the container allocation request, false otherwise
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Resource cluster,
|
public boolean accept(Resource cluster,
|
||||||
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
|
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
|
||||||
|
@ -1043,6 +1101,10 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
this.state = queueState;
|
this.state = queueState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the state of this queue to RUNNING.
|
||||||
|
* @throws YarnException if its parent queue is not in RUNNING state
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void activateQueue() throws YarnException {
|
public void activateQueue() throws YarnException {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
|
@ -1064,6 +1126,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops this queue if no application is currently running on the queue.
|
||||||
|
*/
|
||||||
protected void appFinished() {
|
protected void appFinished() {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -1087,6 +1152,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return userWeights;
|
return userWeights;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recursively sets the state of this queue and the state of its parent to DRAINING.
|
||||||
|
*/
|
||||||
public void recoverDrainingState() {
|
public void recoverDrainingState() {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -1308,10 +1376,19 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether this queue is a dynamic queue and could be deleted.
|
||||||
|
* @return true if the dynamic queue could be deleted, false otherwise
|
||||||
|
*/
|
||||||
public boolean isEligibleForAutoDeletion() {
|
public boolean isEligibleForAutoDeletion() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether this queue is a dynamic queue and there has not been an application submission
|
||||||
|
* on it for a configured period of time.
|
||||||
|
* @return true if queue has been idle for a configured period of time, false otherwise
|
||||||
|
*/
|
||||||
public boolean isInactiveDynamicQueue() {
|
public boolean isInactiveDynamicQueue() {
|
||||||
long idleDurationSeconds =
|
long idleDurationSeconds =
|
||||||
(Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
|
(Time.monotonicNow() - getLastSubmittedTimestamp())/1000;
|
||||||
|
|
Loading…
Reference in New Issue