YARN-3140. Improve locks in AbstractCSQueue/LeafQueue/ParentQueue. Contributed by Wangda Tan

(cherry picked from commit 2b66d9ec5b)
This commit is contained in:
Jian He 2016-09-20 15:03:07 +08:00
parent 4e376f162f
commit 3acd30df71
7 changed files with 1817 additions and 1468 deletions

View File

@ -537,4 +537,14 @@
</Or>
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
</Match>
<!-- Ignore VO_VOLATILE_INCREMENT, they will be protected by writeLock -->
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User" />
<Or>
<Field name="pendingApplications" />
<Field name="activeApplications" />
</Or>
<Bug pattern="VO_VOLATILE_INCREMENT" />
</Match>
</FindBugsFilter>

View File

@ -24,6 +24,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -60,25 +61,25 @@
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
volatile CSQueue parent;
final String queueName;
volatile int numContainers;
final Resource minimumAllocation;
volatile Resource maximumAllocation;
QueueState state;
volatile QueueState state;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
final ResourceCalculator resourceCalculator;
Set<String> accessibleLabels;
RMNodeLabelsManager labelManager;
final RMNodeLabelsManager labelManager;
String defaultLabelExpression;
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
volatile boolean reservationsContinueLooking;
private boolean preemptionDisabled;
private volatile boolean preemptionDisabled;
// Track resource usage-by-label like used-resource/pending-resource, etc.
volatile ResourceUsage queueUsage;
@ -94,6 +95,9 @@ public abstract class AbstractCSQueue implements CSQueue {
protected ActivitiesManager activitiesManager;
protected ReentrantReadWriteLock.ReadLock readLock;
protected ReentrantReadWriteLock.WriteLock writeLock;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.labelManager = cs.getRMContext().getNodeLabelManager();
@ -117,6 +121,10 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
}
protected void setupConfigurableCapacities() {
@ -128,12 +136,12 @@ protected void setupConfigurableCapacities() {
}
@Override
public synchronized float getCapacity() {
public float getCapacity() {
return queueCapacities.getCapacity();
}
@Override
public synchronized float getAbsoluteCapacity() {
public float getAbsoluteCapacity() {
return queueCapacities.getAbsoluteCapacity();
}
@ -167,7 +175,7 @@ public int getNumContainers() {
}
@Override
public synchronized QueueState getState() {
public QueueState getState() {
return state;
}
@ -187,13 +195,13 @@ public PrivilegedEntity getPrivilegedEntity() {
}
@Override
public synchronized CSQueue getParent() {
public CSQueue getParent() {
return parent;
}
@Override
public synchronized void setParent(CSQueue newParentQueue) {
this.parent = (ParentQueue)newParentQueue;
public void setParent(CSQueue newParentQueue) {
this.parent = newParentQueue;
}
public Set<String> getAccessibleNodeLabels() {
@ -221,18 +229,22 @@ public void setAbsoluteUsedCapacity(float absUsedCapacity) {
* Set maximum capacity - used only for testing.
* @param maximumCapacity new max capacity
*/
synchronized void setMaxCapacity(float maximumCapacity) {
void setMaxCapacity(float maximumCapacity) {
try {
writeLock.lock();
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(), maximumCapacity);
float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
maximumCapacity, parent);
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
queueCapacities.getAbsoluteCapacity(),
absMaxCapacity);
queueCapacities.getAbsoluteCapacity(), absMaxCapacity);
queueCapacities.setMaximumCapacity(maximumCapacity);
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
} finally {
writeLock.unlock();
}
}
@Override
@ -240,13 +252,16 @@ public String getDefaultNodeLabelExpression() {
return defaultLabelExpression;
}
synchronized void setupQueueConfigs(Resource clusterResource)
void setupQueueConfigs(Resource clusterResource)
throws IOException {
try {
writeLock.lock();
// get labels
this.accessibleLabels =
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
this.defaultLabelExpression = csContext.getConfiguration()
.getDefaultNodeLabelExpression(getQueuePath());
this.defaultLabelExpression =
csContext.getConfiguration().getDefaultNodeLabelExpression(
getQueuePath());
// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
@ -255,7 +270,8 @@ synchronized void setupQueueConfigs(Resource clusterResource)
// inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
&& this.accessibleLabels.containsAll(
parent.getAccessibleNodeLabels())) {
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
}
@ -278,32 +294,40 @@ synchronized void setupQueueConfigs(Resource clusterResource)
// Check if labels of this queue is a subset of parent queue, only do this
// when we not root
if (parent != null && parent.getParent() != null) {
if (parent.getAccessibleNodeLabels() != null
&& !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
if (parent.getAccessibleNodeLabels() != null && !parent
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
// if parent isn't "*", child shouldn't be "*" too
if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
if (this.getAccessibleNodeLabels().contains(
RMNodeLabelsManager.ANY)) {
throw new IOException("Parent's accessible queue is not ANY(*), "
+ "but child's accessible queue is *");
} else {
Set<String> diff =
Sets.difference(this.getAccessibleNodeLabels(),
} else{
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
parent.getAccessibleNodeLabels());
if (!diff.isEmpty()) {
throw new IOException("Some labels of child queue is not a subset "
+ "of parent queue, these labels=["
+ StringUtils.join(diff, ",") + "]");
throw new IOException(
"Some labels of child queue is not a subset "
+ "of parent queue, these labels=[" + StringUtils
.join(diff, ",") + "]");
}
}
}
}
this.reservationsContinueLooking = csContext.getConfiguration()
.getReservationContinueLook();
this.reservationsContinueLooking =
csContext.getConfiguration().getReservationContinueLook();
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
} finally {
writeLock.unlock();
}
}
protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setAccessibleNodeLabels(accessibleLabels);
@ -318,8 +342,12 @@ protected QueueInfo getQueueInfo() {
}
public QueueStatistics getQueueStatistics() {
QueueStatistics stats =
recordFactory.newRecordInstance(QueueStatistics.class);
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
// consistency here.
// TODO, improve this
QueueStatistics stats = recordFactory.newRecordInstance(
QueueStatistics.class);
stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted());
stats.setNumAppsRunning(getMetrics().getAppsRunning());
stats.setNumAppsPending(getMetrics().getAppsPending());
@ -351,8 +379,10 @@ public Resource getMinimumAllocation() {
return minimumAllocation;
}
synchronized void allocateResource(Resource clusterResource,
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
try {
writeLock.lock();
queueUsage.incUsed(nodePartition, resource);
if (!changeContainerResource) {
@ -360,10 +390,15 @@ synchronized void allocateResource(Resource clusterResource,
}
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
} finally {
writeLock.unlock();
}
}
protected synchronized void releaseResource(Resource clusterResource,
protected void releaseResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
try {
writeLock.lock();
queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@ -372,6 +407,9 @@ protected synchronized void releaseResource(Resource clusterResource,
if (!changeContainerResource) {
--numContainers;
}
} finally {
writeLock.unlock();
}
}
@Private
@ -381,7 +419,13 @@ public boolean getReservationContinueLooking() {
@Private
public Map<AccessType, AccessControlList> getACLs() {
try {
readLock.lock();
return acls;
} finally {
readLock.unlock();
}
}
@Private
@ -464,9 +508,11 @@ Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
minimumAllocation);
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits,
Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
try {
readLock.lock();
// Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity.
@ -478,9 +524,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
// idle resource on the partition, to avoid wastage, such resource will be
// leveraged as much as we can, and preemption policy will reclaim it back
// when partitoned-resource-request comes back.
Resource currentLimitResource =
getCurrentLimitResource(nodePartition, clusterResource,
currentResourceLimits, schedulingMode);
Resource currentLimitResource = getCurrentLimitResource(nodePartition,
clusterResource, currentResourceLimits, schedulingMode);
Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
@ -502,23 +547,24 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
// potentially use this node instead of a reserved node if the application
// has reserved containers.
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking
&& nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
if (this.reservationsContinueLooking && nodePartition.equals(
RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan(
resourceCalculator, clusterResource, resourceCouldBeUnreserved,
Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved);
Resource newTotalWithoutReservedResource = Resources.subtract(
usedExceptKillable, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
LOG.debug(
"try to use reserved: " + getQueueName() + " usedResources: "
+ queueUsage.getUsed() + ", clusterResources: "
+ clusterResource + ", reservedResources: "
+ resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
@ -527,23 +573,23 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, nodePartition="
+ nodePartition
+ " usedResources: "
+ queueUsage.getUsed(nodePartition)
+ " clusterResources: "
+ clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(nodePartition),
labelManager.getResourceByLabel(nodePartition, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
LOG.debug(getQueueName() + "Check assign to queue, nodePartition="
+ nodePartition + " usedResources: " + queueUsage
.getUsed(nodePartition) + " clusterResources: " + clusterResource
+ " currentUsedCapacity " + Resources
.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(nodePartition), labelManager
.getResourceByLabel(nodePartition, clusterResource))
+ " max-capacity: " + queueCapacities
.getAbsoluteMaximumCapacity(nodePartition) + ")");
}
return false;
}
return true;
} finally {
readLock.unlock();
}
}
@Override

View File

@ -107,8 +107,10 @@ public ParentQueue(CapacitySchedulerContext cs,
", fullname=" + getQueuePath());
}
synchronized void setupQueueConfigs(Resource clusterResource)
void setupQueueConfigs(Resource clusterResource)
throws IOException {
try {
writeLock.lock();
super.setupQueueConfigs(clusterResource);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
@ -123,19 +125,23 @@ synchronized void setupQueueConfigs(Resource clusterResource)
}
}
LOG.info(queueName +
", capacity=" + this.queueCapacities.getCapacity() +
", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
", absoluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
", state=" + state +
", acls=" + aclsString +
", labels=" + labelStrBuilder.toString() + "\n" +
", reservationsContinueLooking=" + reservationsContinueLooking);
LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity()
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
+ ", absoluteMaxCapacity=" + this.queueCapacities
.getAbsoluteMaximumCapacity() + ", state=" + state + ", acls="
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
+ ", reservationsContinueLooking=" + reservationsContinueLooking);
} finally {
writeLock.unlock();
}
}
private static float PRECISION = 0.0005f; // 0.05% precision
synchronized void setChildQueues(Collection<CSQueue> childQueues) {
void setChildQueues(Collection<CSQueue> childQueues) {
try {
writeLock.lock();
// Validate
float childCapacities = 0;
for (CSQueue queue : childQueues) {
@ -143,11 +149,11 @@ synchronized void setChildQueues(Collection<CSQueue> childQueues) {
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
// allow capacities being set to 0, and enforce child 0 if parent is 0
if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) ||
((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" +
" capacity of " + childCapacities +
" for children of queue " + queueName);
if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || (
(queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException(
"Illegal" + " capacity of " + childCapacities
+ " for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
@ -159,9 +165,9 @@ synchronized void setChildQueues(Collection<CSQueue> childQueues) {
}
if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
throw new IllegalArgumentException("Illegal" + " capacity of "
+ sum + " for children of queue " + queueName
+ " for label=" + nodeLabel);
throw new IllegalArgumentException(
"Illegal" + " capacity of " + sum + " for children of queue "
+ queueName + " for label=" + nodeLabel);
}
}
@ -170,6 +176,9 @@ synchronized void setChildQueues(Collection<CSQueue> childQueues) {
if (LOG.isDebugEnabled()) {
LOG.debug("setChildQueues: " + getChildQueuesToPrint());
}
} finally {
writeLock.unlock();
}
}
@Override
@ -179,27 +188,34 @@ public String getQueuePath() {
}
@Override
public synchronized QueueInfo getQueueInfo(
public QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
try {
readLock.lock();
QueueInfo queueInfo = getQueueInfo();
List<QueueInfo> childQueuesInfo = new ArrayList<QueueInfo>();
List<QueueInfo> childQueuesInfo = new ArrayList<>();
if (includeChildQueues) {
for (CSQueue child : childQueues) {
// Get queue information recursively?
childQueuesInfo.add(
child.getQueueInfo(recursive, recursive));
childQueuesInfo.add(child.getQueueInfo(recursive, recursive));
}
}
queueInfo.setChildQueues(childQueuesInfo);
return queueInfo;
} finally {
readLock.unlock();
}
private synchronized QueueUserACLInfo getUserAclInfo(
}
private QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
try {
readLock.lock();
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
@ -210,12 +226,18 @@ private synchronized QueueUserACLInfo getUserAclInfo(
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return userAclInfo;
} finally {
readLock.unlock();
}
}
@Override
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
public List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
try {
readLock.lock();
List<QueueUserACLInfo> userAcls = new ArrayList<>();
// Add parent queue acls
userAcls.add(getUserAclInfo(user));
@ -226,6 +248,10 @@ public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
}
return userAcls;
} finally {
readLock.unlock();
}
}
public String toString() {
@ -240,16 +266,19 @@ public String toString() {
}
@Override
public synchronized void reinitialize(CSQueue newlyParsedQueue,
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + newlyParsedQueue.getQueuePath());
if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource);
@ -257,8 +286,8 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
Map<String, CSQueue> newChildQueues =
getQueues(newlyParsedParentQueue.childQueues);
Map<String, CSQueue> newChildQueues = getQueues(
newlyParsedParentQueue.childQueues);
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();
@ -270,7 +299,7 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
// Re-init existing child queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
} else {
} else{
// New child queue, do not re-init
// Set parent to 'this'
@ -279,13 +308,17 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);
LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
LOG.info(
getQueueName() + ": added new child queue: " + newChildQueue);
}
}
// Re-sort all queues
childQueues.clear();
childQueues.addAll(currentChildQueues.values());
} finally {
writeLock.unlock();
}
}
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
@ -300,20 +333,23 @@ Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
public void submitApplication(ApplicationId applicationId, String user,
String queue) throws AccessControlException {
synchronized (this) {
try {
writeLock.lock();
// Sanity check
if (queue.equals(queueName)) {
throw new AccessControlException("Cannot submit application " +
"to non-leaf queue: " + queueName);
throw new AccessControlException(
"Cannot submit application " + "to non-leaf queue: " + queueName);
}
if (state != QueueState.RUNNING) {
throw new AccessControlException("Queue " + getQueuePath() +
" is STOPPED. Cannot accept submission of application: " +
applicationId);
throw new AccessControlException("Queue " + getQueuePath()
+ " is STOPPED. Cannot accept submission of application: "
+ applicationId);
}
addApplication(applicationId, user);
} finally {
writeLock.unlock();
}
// Inform the parent queue
@ -342,24 +378,26 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
// finish attempt logic.
}
private synchronized void addApplication(ApplicationId applicationId,
private void addApplication(ApplicationId applicationId,
String user) {
try {
writeLock.lock();
++numApplications;
LOG.info("Application added -" +
" appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
LOG.info(
"Application added -" + " appId: " + applicationId + " user: " + user
+ " leaf-queue of parent: " + getQueueName() + " #applications: "
+ getNumApplications());
} finally {
writeLock.unlock();
}
}
@Override
public void finishApplication(ApplicationId application, String user) {
synchronized (this) {
removeApplication(application, user);
}
// Inform the parent queue
if (parent != null) {
@ -367,16 +405,18 @@ public void finishApplication(ApplicationId application, String user) {
}
}
private synchronized void removeApplication(ApplicationId applicationId,
private void removeApplication(ApplicationId applicationId,
String user) {
try {
writeLock.lock();
--numApplications;
LOG.info("Application removed -" +
" appId: " + applicationId +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
LOG.info("Application removed -" + " appId: " + applicationId + " user: "
+ user + " leaf-queue of parent: " + getQueueName()
+ " #applications: " + getNumApplications());
} finally {
writeLock.unlock();
}
}
private String getParentName() {
@ -384,9 +424,11 @@ private String getParentName() {
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
SchedulingMode schedulingMode) {
try {
writeLock.lock();
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
@ -410,12 +452,13 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
if (!super.hasPendingResourceRequest(node.getPartition(),
clusterResource, schedulingMode)) {
if (!super.hasPendingResourceRequest(node.getPartition(), clusterResource,
schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
+ schedulingMode.name() + " node-partition=" + node
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
@ -429,8 +472,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
return CSAssignment.NULL_ASSIGNMENT;
}
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0),
NodeType.NODE_LOCAL);
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
@ -442,9 +485,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
resourceLimits, Resources.createResource(
getMetrics().getReservedMB(), getMetrics()
.getReservedVirtualCores()), schedulingMode)) {
resourceLimits, Resources
.createResource(getMetrics().getReservedMB(),
getMetrics().getReservedVirtualCores()), schedulingMode)) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
@ -458,14 +501,12 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, resourceLimits,
schedulingMode);
CSAssignment assignedToChild = assignContainersToChildQueues(
clusterResource, node, resourceLimits, schedulingMode);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
if (Resources.greaterThan(
resourceCalculator, clusterResource,
if (Resources.greaterThan(resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
@ -480,7 +521,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
.getFirstAllocatedOrReservedContainerId(),
AllocationState.ALLOCATED);
}
} else {
} else{
if (rootQueue) {
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
activitiesManager, node,
@ -495,8 +536,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
node.getPartition(), assignedToChild.isIncreasedAllocation());
// Track resource utilization in this pass of the scheduler
Resources
.addTo(assignment.getResource(), assignedToChild.getResource());
Resources.addTo(assignment.getResource(),
assignedToChild.getResource());
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
assignedToChild.getAssignmentInformation().getAllocated());
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
@ -505,28 +546,21 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
assignedToChild.getAssignmentInformation().getNumAllocations());
assignment.getAssignmentInformation().incrReservations(
assignedToChild.getAssignmentInformation().getNumReservations());
assignment
.getAssignmentInformation()
.getAllocationDetails()
.addAll(
assignedToChild.getAssignmentInformation().getAllocationDetails());
assignment
.getAssignmentInformation()
.getReservationDetails()
.addAll(
assignment.getAssignmentInformation().getAllocationDetails().addAll(
assignedToChild.getAssignmentInformation()
.getAllocationDetails());
assignment.getAssignmentInformation().getReservationDetails().addAll(
assignedToChild.getAssignmentInformation()
.getReservationDetails());
assignment.setIncreasedAllocation(assignedToChild
.isIncreasedAllocation());
assignment.setIncreasedAllocation(
assignedToChild.isIncreasedAllocation());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
" usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + queueUsage.getUsed() +
" cluster=" + clusterResource);
LOG.info("assignedContainer" + " queue=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed()
+ " cluster=" + clusterResource);
} else {
} else{
assignment.setSkippedType(assignedToChild.getSkippedType());
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
@ -541,10 +575,11 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
}
if (LOG.isDebugEnabled()) {
LOG.debug("ParentQ=" + getQueueName()
+ " assignedSoFarInThisIteration=" + assignment.getResource()
+ " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
LOG.debug(
"ParentQ=" + getQueueName() + " assignedSoFarInThisIteration="
+ assignment.getResource() + " usedCapacity="
+ getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity());
}
// Do not assign more than one container if this isn't the root queue
@ -552,8 +587,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
LOG.debug("Not assigning more than one off-switch container,"
+ " assignments so far: " + assignment);
}
}
break;
@ -561,6 +596,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
}
return assignment;
} finally {
writeLock.unlock();
}
}
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
@ -628,7 +666,7 @@ private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode
return childrenList.iterator();
}
private synchronized CSAssignment assignContainersToChildQueues(
private CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
SchedulingMode schedulingMode) {
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
@ -717,15 +755,17 @@ private void printChildQueues() {
}
}
private synchronized void internalReleaseResource(Resource clusterResource,
private void internalReleaseResource(Resource clusterResource,
FiCaSchedulerNode node, Resource releasedResource, boolean changeResource,
CSQueue completedChildQueue, boolean sortQueues) {
super.releaseResource(clusterResource,
releasedResource, node.getPartition(),
changeResource);
try {
writeLock.lock();
super.releaseResource(clusterResource, releasedResource,
node.getPartition(), changeResource);
if (LOG.isDebugEnabled()) {
LOG.debug("completedContainer " + this + ", cluster=" + clusterResource);
LOG.debug(
"completedContainer " + this + ", cluster=" + clusterResource);
}
// Note that this is using an iterator on the childQueues so this can't
@ -733,7 +773,8 @@ private synchronized void internalReleaseResource(Resource clusterResource,
// from assignContainersToChildQueues.
if (sortQueues) {
// reinsert the updated queue
for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
for (Iterator<CSQueue> iter = childQueues.iterator();
iter.hasNext(); ) {
CSQueue csqueue = iter.next();
if (csqueue.equals(completedChildQueue)) {
iter.remove();
@ -750,6 +791,9 @@ private synchronized void internalReleaseResource(Resource clusterResource,
// sure we allocate from least usage (or order defined by queue policy)
// queues.
needToResortQueuesAtNextAllocation = !sortQueues;
} finally {
writeLock.unlock();
}
}
@Override
@ -806,8 +850,10 @@ public void completedContainer(Resource clusterResource,
}
@Override
public synchronized void updateClusterResource(Resource clusterResource,
public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {
try {
writeLock.lock();
// Update all children
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
@ -819,11 +865,20 @@ public synchronized void updateClusterResource(Resource clusterResource,
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
} finally {
writeLock.unlock();
}
}
@Override
public synchronized List<CSQueue> getChildQueues() {
public List<CSQueue> getChildQueues() {
try {
readLock.lock();
return new ArrayList<CSQueue>(childQueues);
} finally {
readLock.unlock();
}
}
@Override
@ -832,13 +887,18 @@ public void recoverContainer(Resource clusterResource,
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
// Careful! Locking order is important!
synchronized (this) {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
try {
writeLock.lock();
FiCaSchedulerNode node = scheduler.getNode(
rmContainer.getContainer().getNodeId());
allocateResource(clusterResource,
rmContainer.getContainer().getResource(), node.getPartition(), false);
} finally {
writeLock.unlock();
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
}
@ -851,11 +911,17 @@ public ActiveUsersManager getActiveUsersManager() {
}
@Override
public synchronized void collectSchedulerApplications(
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
try {
readLock.lock();
for (CSQueue queue : childQueues) {
queue.collectSchedulerApplications(apps);
}
} finally {
readLock.unlock();
}
}
@Override
@ -897,12 +963,14 @@ public void detachContainer(Resource clusterResource,
}
}
public synchronized int getNumApplications() {
public int getNumApplications() {
return numApplications;
}
synchronized void allocateResource(Resource clusterResource,
void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
try {
writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition,
changeContainerResource);
@ -936,6 +1004,9 @@ synchronized void allocateResource(Resource clusterResource,
< getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
}
} finally {
writeLock.unlock();
}
}
private void killContainersToEnforceMaxQueueCapacity(String partition,

View File

@ -79,13 +79,16 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
}
@Override
public synchronized void reinitialize(CSQueue newlyParsedQueue,
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof PlanQueue)
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " from " + newlyParsedQueue.getQueuePath());
if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
@ -109,27 +112,38 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
for (CSQueue res : this.getChildQueues()) {
res.reinitialize(res, clusterResource);
}
showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
showReservationsAsQueues =
newlyParsedParentQueue.showReservationsAsQueues;
} finally {
writeLock.unlock();
}
}
synchronized void addChildQueue(CSQueue newQueue)
void addChildQueue(CSQueue newQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (newQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException("Queue " + newQueue
+ " being added has non zero capacity.");
throw new SchedulerDynamicEditException(
"Queue " + newQueue + " being added has non zero capacity.");
}
boolean added = this.childQueues.add(newQueue);
if (LOG.isDebugEnabled()) {
LOG.debug("updateChildQueues (action: add queue): " + added + " "
+ getChildQueuesToPrint());
}
} finally {
writeLock.unlock();
}
}
synchronized void removeChildQueue(CSQueue remQueue)
void removeChildQueue(CSQueue remQueue)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
if (remQueue.getCapacity() > 0) {
throw new SchedulerDynamicEditException("Queue " + remQueue
+ " being removed has non zero capacity.");
throw new SchedulerDynamicEditException(
"Queue " + remQueue + " being removed has non zero capacity.");
}
Iterator<CSQueue> qiter = childQueues.iterator();
while (qiter.hasNext()) {
@ -141,14 +155,22 @@ synchronized void removeChildQueue(CSQueue remQueue)
}
}
}
} finally {
writeLock.unlock();
}
}
protected synchronized float sumOfChildCapacities() {
protected float sumOfChildCapacities() {
try {
writeLock.lock();
float ret = 0;
for (CSQueue l : childQueues) {
ret += l.getCapacity();
}
return ret;
} finally {
writeLock.unlock();
}
}
private void updateQuotas(int userLimit, float userLimitFactor,

View File

@ -51,13 +51,16 @@ public ReservationQueue(CapacitySchedulerContext cs, String queueName,
}
@Override
public synchronized void reinitialize(CSQueue newlyParsedQueue,
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
try {
writeLock.lock();
// Sanity check
if (!(newlyParsedQueue instanceof ReservationQueue)
|| !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " from " + newlyParsedQueue.getQueuePath());
if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
.getQueuePath().equals(getQueuePath())) {
throw new IOException(
"Trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@ -67,6 +70,9 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(),
parent.getMaxApplicationsPerUserForReservation());
} finally {
writeLock.unlock();
}
}
/**
@ -77,8 +83,10 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
* maxCapacity, etc..)
* @throws SchedulerDynamicEditException
*/
public synchronized void setEntitlement(QueueEntitlement entitlement)
public void setEntitlement(QueueEntitlement entitlement)
throws SchedulerDynamicEditException {
try {
writeLock.lock();
float capacity = entitlement.getCapacity();
if (capacity < 0 || capacity > 1.0f) {
throw new SchedulerDynamicEditException(
@ -90,8 +98,11 @@ public synchronized void setEntitlement(QueueEntitlement entitlement)
// this might be revised later
setMaxCapacity(entitlement.getMaxCapacity());
if (LOG.isDebugEnabled()) {
LOG.debug("successfully changed to " + capacity + " for queue "
+ this.getQueueName());
LOG.debug("successfully changed to " + capacity + " for queue " + this
.getQueueName());
}
} finally {
writeLock.unlock();
}
}

View File

@ -828,8 +828,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
app.getAppAttemptResourceUsage().getPending().getMemorySize());
// Queue/user/application's usage will be updated
checkUsedResource(rm1, "default", 0 * GB, null);
Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
.getUser("user").getUsed().getMemorySize());
// User will be removed
Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user"));
Assert.assertEquals(0 * GB,
app.getAppAttemptResourceUsage().getReserved().getMemorySize());
Assert.assertEquals(0 * GB,