From 3acd30df711f999e444316621d54177d53dbb4d1 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 20 Sep 2016 15:03:07 +0800 Subject: [PATCH] YARN-3140. Improve locks in AbstractCSQueue/LeafQueue/ParentQueue. Contributed by Wangda Tan (cherry picked from commit 2b66d9ec5bdaec7e6b278926fbb6f222c4e3afaa) --- .../dev-support/findbugs-exclude.xml | 10 + .../scheduler/capacity/AbstractCSQueue.java | 368 ++-- .../scheduler/capacity/LeafQueue.java | 1857 +++++++++-------- .../scheduler/capacity/ParentQueue.java | 851 ++++---- .../scheduler/capacity/PlanQueue.java | 128 +- .../scheduler/capacity/ReservationQueue.java | 67 +- .../capacity/TestContainerResizing.java | 4 +- 7 files changed, 1817 insertions(+), 1468 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 68d3662f0fe..2be45e559e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -537,4 +537,14 @@              + + + + +     + + +     + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 1d8f9290ccb..096f5ea7e94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -60,25 +61,25 @@ import com.google.common.collect.Sets; public abstract class AbstractCSQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class); - CSQueue parent; + volatile CSQueue parent; final String queueName; volatile int numContainers; final Resource minimumAllocation; volatile Resource maximumAllocation; - QueueState state; + volatile QueueState state; final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; final ResourceCalculator resourceCalculator; Set accessibleLabels; - RMNodeLabelsManager labelManager; + final RMNodeLabelsManager labelManager; String defaultLabelExpression; Map acls = new HashMap(); volatile boolean reservationsContinueLooking; - private boolean preemptionDisabled; + private volatile boolean preemptionDisabled; // Track resource usage-by-label like used-resource/pending-resource, etc. volatile ResourceUsage queueUsage; @@ -94,6 +95,9 @@ public abstract class AbstractCSQueue implements CSQueue { protected ActivitiesManager activitiesManager; + protected ReentrantReadWriteLock.ReadLock readLock; + protected ReentrantReadWriteLock.WriteLock writeLock; + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); @@ -116,7 +120,11 @@ public abstract class AbstractCSQueue implements CSQueue { queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); // initialize QueueCapacities - queueCapacities = new QueueCapacities(parent == null); + queueCapacities = new QueueCapacities(parent == null); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); } protected void setupConfigurableCapacities() { @@ -128,12 +136,12 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized float getCapacity() { + public float getCapacity() { return queueCapacities.getCapacity(); } @Override - public synchronized float getAbsoluteCapacity() { + public float getAbsoluteCapacity() { return queueCapacities.getAbsoluteCapacity(); } @@ -167,7 +175,7 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized QueueState getState() { + public QueueState getState() { return state; } @@ -187,13 +195,13 @@ public abstract class AbstractCSQueue implements CSQueue { } @Override - public synchronized CSQueue getParent() { + public CSQueue getParent() { return parent; } @Override - public synchronized void setParent(CSQueue newParentQueue) { - this.parent = (ParentQueue)newParentQueue; + public void setParent(CSQueue newParentQueue) { + this.parent = newParentQueue; } public Set getAccessibleNodeLabels() { @@ -221,18 +229,22 @@ public abstract class AbstractCSQueue implements CSQueue { * Set maximum capacity - used only for testing. * @param maximumCapacity new max capacity */ - synchronized void setMaxCapacity(float maximumCapacity) { - // Sanity check - CSQueueUtils.checkMaxCapacity(getQueueName(), - queueCapacities.getCapacity(), maximumCapacity); - float absMaxCapacity = - CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); - CSQueueUtils.checkAbsoluteCapacity(getQueueName(), - queueCapacities.getAbsoluteCapacity(), - absMaxCapacity); - - queueCapacities.setMaximumCapacity(maximumCapacity); - queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); + void setMaxCapacity(float maximumCapacity) { + try { + writeLock.lock(); + // Sanity check + CSQueueUtils.checkMaxCapacity(getQueueName(), + queueCapacities.getCapacity(), maximumCapacity); + float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, parent); + CSQueueUtils.checkAbsoluteCapacity(getQueueName(), + queueCapacities.getAbsoluteCapacity(), absMaxCapacity); + + queueCapacities.setMaximumCapacity(maximumCapacity); + queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity); + } finally { + writeLock.unlock(); + } } @Override @@ -240,70 +252,82 @@ public abstract class AbstractCSQueue implements CSQueue { return defaultLabelExpression; } - synchronized void setupQueueConfigs(Resource clusterResource) + void setupQueueConfigs(Resource clusterResource) throws IOException { - // get labels - this.accessibleLabels = - csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); - this.defaultLabelExpression = csContext.getConfiguration() - .getDefaultNodeLabelExpression(getQueuePath()); + try { + writeLock.lock(); + // get labels + this.accessibleLabels = + csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = + csContext.getConfiguration().getDefaultNodeLabelExpression( + getQueuePath()); - // inherit from parent if labels not set - if (this.accessibleLabels == null && parent != null) { - this.accessibleLabels = parent.getAccessibleNodeLabels(); - } - - // inherit from parent if labels not set - if (this.defaultLabelExpression == null && parent != null - && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { - this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); - } + // inherit from parent if labels not set + if (this.accessibleLabels == null && parent != null) { + this.accessibleLabels = parent.getAccessibleNodeLabels(); + } - // After we setup labels, we can setup capacities - setupConfigurableCapacities(); - - this.maximumAllocation = - csContext.getConfiguration().getMaximumAllocationPerQueue( - getQueuePath()); - - authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); - - this.state = csContext.getConfiguration().getState(getQueuePath()); - this.acls = csContext.getConfiguration().getAcls(getQueuePath()); + // inherit from parent if labels not set + if (this.defaultLabelExpression == null && parent != null + && this.accessibleLabels.containsAll( + parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); - - // Check if labels of this queue is a subset of parent queue, only do this - // when we not root - if (parent != null && parent.getParent() != null) { - if (parent.getAccessibleNodeLabels() != null - && !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - // if parent isn't "*", child shouldn't be "*" too - if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - throw new IOException("Parent's accessible queue is not ANY(*), " - + "but child's accessible queue is *"); - } else { - Set diff = - Sets.difference(this.getAccessibleNodeLabels(), - parent.getAccessibleNodeLabels()); - if (!diff.isEmpty()) { - throw new IOException("Some labels of child queue is not a subset " - + "of parent queue, these labels=[" - + StringUtils.join(diff, ",") + "]"); + // After we setup labels, we can setup capacities + setupConfigurableCapacities(); + + this.maximumAllocation = + csContext.getConfiguration().getMaximumAllocationPerQueue( + getQueuePath()); + + authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf()); + + this.state = csContext.getConfiguration().getState(getQueuePath()); + this.acls = csContext.getConfiguration().getAcls(getQueuePath()); + + // Update metrics + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); + + // Check if labels of this queue is a subset of parent queue, only do this + // when we not root + if (parent != null && parent.getParent() != null) { + if (parent.getAccessibleNodeLabels() != null && !parent + .getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + // if parent isn't "*", child shouldn't be "*" too + if (this.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY)) { + throw new IOException("Parent's accessible queue is not ANY(*), " + + "but child's accessible queue is *"); + } else{ + Set diff = Sets.difference(this.getAccessibleNodeLabels(), + parent.getAccessibleNodeLabels()); + if (!diff.isEmpty()) { + throw new IOException( + "Some labels of child queue is not a subset " + + "of parent queue, these labels=[" + StringUtils + .join(diff, ",") + "]"); + } } } } + + this.reservationsContinueLooking = + csContext.getConfiguration().getReservationContinueLook(); + + this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + } finally { + writeLock.unlock(); } - - this.reservationsContinueLooking = csContext.getConfiguration() - .getReservationContinueLook(); - - this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); } - + protected QueueInfo getQueueInfo() { + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); queueInfo.setQueueName(queueName); queueInfo.setAccessibleNodeLabels(accessibleLabels); @@ -318,8 +342,12 @@ public abstract class AbstractCSQueue implements CSQueue { } public QueueStatistics getQueueStatistics() { - QueueStatistics stats = - recordFactory.newRecordInstance(QueueStatistics.class); + // Deliberately doesn't use lock here, because this method will be invoked + // from schedulerApplicationAttempt, to avoid deadlock, sacrifice + // consistency here. + // TODO, improve this + QueueStatistics stats = recordFactory.newRecordInstance( + QueueStatistics.class); stats.setNumAppsSubmitted(getMetrics().getAppsSubmitted()); stats.setNumAppsRunning(getMetrics().getAppsRunning()); stats.setNumAppsPending(getMetrics().getAppsPending()); @@ -351,26 +379,36 @@ public abstract class AbstractCSQueue implements CSQueue { return minimumAllocation; } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - queueUsage.incUsed(nodePartition, resource); + try { + writeLock.lock(); + queueUsage.incUsed(nodePartition, resource); - if (!changeContainerResource) { - ++numContainers; + if (!changeContainerResource) { + ++numContainers; + } + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); + } finally { + writeLock.unlock(); } - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, nodePartition); } - protected synchronized void releaseResource(Resource clusterResource, + protected void releaseResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - queueUsage.decUsed(nodePartition, resource); + try { + writeLock.lock(); + queueUsage.decUsed(nodePartition, resource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, nodePartition); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); - if (!changeContainerResource) { - --numContainers; + if (!changeContainerResource) { + --numContainers; + } + } finally { + writeLock.unlock(); } } @@ -381,7 +419,13 @@ public abstract class AbstractCSQueue implements CSQueue { @Private public Map getACLs() { - return acls; + try { + readLock.lock(); + return acls; + } finally { + readLock.unlock(); + } + } @Private @@ -464,86 +508,88 @@ public abstract class AbstractCSQueue implements CSQueue { minimumAllocation); } - synchronized boolean canAssignToThisQueue(Resource clusterResource, + boolean canAssignToThisQueue(Resource clusterResource, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) { - // Get current limited resource: - // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect - // queues' max capacity. - // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect - // queue's max capacity, queue's max capacity on the partition will be - // considered to be 100%. Which is a queue can use all resource in the - // partition. - // Doing this because: for non-exclusive allocation, we make sure there's - // idle resource on the partition, to avoid wastage, such resource will be - // leveraged as much as we can, and preemption policy will reclaim it back - // when partitoned-resource-request comes back. - Resource currentLimitResource = - getCurrentLimitResource(nodePartition, clusterResource, - currentResourceLimits, schedulingMode); + try { + readLock.lock(); + // Get current limited resource: + // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect + // queues' max capacity. + // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect + // queue's max capacity, queue's max capacity on the partition will be + // considered to be 100%. Which is a queue can use all resource in the + // partition. + // Doing this because: for non-exclusive allocation, we make sure there's + // idle resource on the partition, to avoid wastage, such resource will be + // leveraged as much as we can, and preemption policy will reclaim it back + // when partitoned-resource-request comes back. + Resource currentLimitResource = getCurrentLimitResource(nodePartition, + clusterResource, currentResourceLimits, schedulingMode); - Resource nowTotalUsed = queueUsage.getUsed(nodePartition); + Resource nowTotalUsed = queueUsage.getUsed(nodePartition); - // Set headroom for currentResourceLimits: - // When queue is a parent queue: Headroom = limit - used + killable - // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) - Resource usedExceptKillable = nowTotalUsed; - if (null != getChildQueues() && !getChildQueues().isEmpty()) { - usedExceptKillable = Resources.subtract(nowTotalUsed, - getTotalKillableResource(nodePartition)); - } - currentResourceLimits.setHeadroom( - Resources.subtract(currentLimitResource, usedExceptKillable)); + // Set headroom for currentResourceLimits: + // When queue is a parent queue: Headroom = limit - used + killable + // When queue is a leaf queue: Headroom = limit - used (leaf queue cannot preempt itself) + Resource usedExceptKillable = nowTotalUsed; + if (null != getChildQueues() && !getChildQueues().isEmpty()) { + usedExceptKillable = Resources.subtract(nowTotalUsed, + getTotalKillableResource(nodePartition)); + } + currentResourceLimits.setHeadroom( + Resources.subtract(currentLimitResource, usedExceptKillable)); - if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, - usedExceptKillable, currentLimitResource)) { + if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource, + usedExceptKillable, currentLimitResource)) { - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && nodePartition.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(usedExceptKillable, resourceCouldBeUnreserved); + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking && nodePartition.equals( + RMNodeLabelsManager.NO_LABEL) && Resources.greaterThan( + resourceCalculator, clusterResource, resourceCouldBeUnreserved, + Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = Resources.subtract( + usedExceptKillable, resourceCouldBeUnreserved); - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "try to use reserved: " + getQueueName() + " usedResources: " + + queueUsage.getUsed() + ", clusterResources: " + + clusterResource + ", reservedResources: " + + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + return true; } - return true; } + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + "Check assign to queue, nodePartition=" + + nodePartition + " usedResources: " + queueUsage + .getUsed(nodePartition) + " clusterResources: " + clusterResource + + " currentUsedCapacity " + Resources + .divide(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), labelManager + .getResourceByLabel(nodePartition, clusterResource)) + + " max-capacity: " + queueCapacities + .getAbsoluteMaximumCapacity(nodePartition) + ")"); + } + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, nodePartition=" - + nodePartition - + " usedResources: " - + queueUsage.getUsed(nodePartition) - + " clusterResources: " - + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), - labelManager.getResourceByLabel(nodePartition, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); - } - return false; + return true; + } finally { + readLock.unlock(); } - return true; + } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 843149ff3a9..1c00fc08571 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -92,11 +93,11 @@ public class LeafQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(LeafQueue.class); private float absoluteUsedCapacity = 0.0f; - private int userLimit; - private float userLimitFactor; + private volatile int userLimit; + private volatile float userLimitFactor; protected int maxApplications; - protected int maxApplicationsPerUser; + protected volatile int maxApplicationsPerUser; private float maxAMResourcePerQueuePercent; @@ -104,15 +105,15 @@ public class LeafQueue extends AbstractCSQueue { private volatile boolean rackLocalityFullReset; Map applicationAttemptMap = - new HashMap(); + new ConcurrentHashMap<>(); private Priority defaultAppPriorityPerQueue; - private OrderingPolicy pendingOrderingPolicy = null; + private final OrderingPolicy pendingOrderingPolicy; private volatile float minimumAllocationFactor; - private Map users = new HashMap(); + private Map users = new ConcurrentHashMap<>(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -129,7 +130,7 @@ public class LeafQueue extends AbstractCSQueue { private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; - private OrderingPolicy orderingPolicy = null; + private volatile OrderingPolicy orderingPolicy = null; // Summation of consumed ratios for all users in queue private float totalUserConsumedRatio = 0; @@ -138,7 +139,7 @@ public class LeafQueue extends AbstractCSQueue { // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = - new HashMap<>(); + new ConcurrentHashMap<>(); @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, @@ -161,125 +162,125 @@ public class LeafQueue extends AbstractCSQueue { setupQueueConfigs(cs.getClusterResource()); } - protected synchronized void setupQueueConfigs(Resource clusterResource) + protected void setupQueueConfigs(Resource clusterResource) throws IOException { - super.setupQueueConfigs(clusterResource); - - this.lastClusterResource = clusterResource; - - this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource); - - // Initialize headroom info, also used for calculating application - // master resource limits. Since this happens during queue initialization - // and all queues may not be realized yet, we'll use (optimistic) - // absoluteMaxCapacity (it will be replaced with the more accurate - // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) - setQueueResourceLimitsInfo(clusterResource); + try { + writeLock.lock(); + super.setupQueueConfigs(clusterResource); - CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + this.lastClusterResource = clusterResource; - setOrderingPolicy(conf.getOrderingPolicy(getQueuePath())); + this.cachedResourceLimitsForHeadroom = new ResourceLimits( + clusterResource); - userLimit = conf.getUserLimit(getQueuePath()); - userLimitFactor = conf.getUserLimitFactor(getQueuePath()); + // Initialize headroom info, also used for calculating application + // master resource limits. Since this happens during queue initialization + // and all queues may not be realized yet, we'll use (optimistic) + // absoluteMaxCapacity (it will be replaced with the more accurate + // absoluteMaxAvailCapacity during headroom/userlimit/allocation events) + setQueueResourceLimitsInfo(clusterResource); - maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); - if (maxApplications < 0) { - int maxSystemApps = conf.getMaximumSystemApplications(); - maxApplications = - (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); - } - maxApplicationsPerUser = Math.min(maxApplications, - (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor)); - - maxAMResourcePerQueuePercent = - conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath()); + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - if (!SchedulerUtils.checkQueueLabelExpression( - this.accessibleLabels, this.defaultLabelExpression, null)) { - throw new IOException("Invalid default label expression of " - + " queue=" - + getQueueName() - + " doesn't have permission to access all labels " - + "in default label expression. labelExpression of resource request=" - + (this.defaultLabelExpression == null ? "" - : this.defaultLabelExpression) - + ". Queue labels=" - + (getAccessibleNodeLabels() == null ? "" : StringUtils.join( - getAccessibleNodeLabels().iterator(), ','))); - } - - nodeLocalityDelay = conf.getNodeLocalityDelay(); - rackLocalityFullReset = conf.getRackLocalityFullReset(); + setOrderingPolicy( + conf.getOrderingPolicy(getQueuePath())); - // re-init this since max allocation could have changed - this.minimumAllocationFactor = - Resources.ratio(resourceCalculator, - Resources.subtract(maximumAllocation, minimumAllocation), - maximumAllocation); + userLimit = conf.getUserLimit(getQueuePath()); + userLimitFactor = conf.getUserLimitFactor(getQueuePath()); - StringBuilder aclsString = new StringBuilder(); - for (Map.Entry e : acls.entrySet()) { - aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); - } - - StringBuilder labelStrBuilder = new StringBuilder(); - if (accessibleLabels != null) { - for (String s : accessibleLabels) { - labelStrBuilder.append(s); - labelStrBuilder.append(","); + maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); + if (maxApplications < 0) { + int maxSystemApps = conf.getMaximumSystemApplications(); + maxApplications = + (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } + maxApplicationsPerUser = Math.min(maxApplications, + (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); + + maxAMResourcePerQueuePercent = + conf.getMaximumApplicationMasterResourcePerQueuePercent( + getQueuePath()); + + if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels, + this.defaultLabelExpression, null)) { + throw new IOException( + "Invalid default label expression of " + " queue=" + getQueueName() + + " doesn't have permission to access all labels " + + "in default label expression. labelExpression of resource request=" + + (this.defaultLabelExpression == null ? + "" : + this.defaultLabelExpression) + ". Queue labels=" + ( + getAccessibleNodeLabels() == null ? + "" : + StringUtils + .join(getAccessibleNodeLabels().iterator(), ','))); + } + + nodeLocalityDelay = conf.getNodeLocalityDelay(); + rackLocalityFullReset = conf.getRackLocalityFullReset(); + + // re-init this since max allocation could have changed + this.minimumAllocationFactor = Resources.ratio(resourceCalculator, + Resources.subtract(maximumAllocation, minimumAllocation), + maximumAllocation); + + StringBuilder aclsString = new StringBuilder(); + for (Map.Entry e : acls.entrySet()) { + aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); + } + + StringBuilder labelStrBuilder = new StringBuilder(); + if (accessibleLabels != null) { + for (String s : accessibleLabels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } + + defaultAppPriorityPerQueue = Priority.newInstance( + conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + + LOG.info( + "Initializing " + queueName + "\n" + "capacity = " + queueCapacities + .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() + + " [= parentAbsoluteCapacity * capacity ]" + "\n" + + "maxCapacity = " + queueCapacities.getMaximumCapacity() + + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = " + + queueCapacities.getAbsoluteMaximumCapacity() + + " [= 1.0 maximumCapacity undefined, " + + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" + + "\n" + "userLimitFactor = " + userLimitFactor + + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + + maxApplications + + " [= configuredMaximumSystemApplicationsPerQueue or" + + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser + + " [= (int)(maxApplications * (userLimit / 100.0f) * " + + "userLimitFactor) ]" + "\n" + "usedCapacity = " + + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + + "absoluteUsedCapacity = " + absoluteUsedCapacity + + " [= usedResourcesMemory / clusterResourceMemory]" + "\n" + + "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent + + " [= configuredMaximumAMResourcePercent ]" + "\n" + + "minimumAllocationFactor = " + minimumAllocationFactor + + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" + + "numContainers = " + numContainers + + " [= currentNumContainers ]" + "\n" + "state = " + state + + " [= configuredState ]" + "\n" + "acls = " + aclsString + + " [= configuredAcls ]" + "\n" + "nodeLocalityDelay = " + + nodeLocalityDelay + "\n" + "labels=" + labelStrBuilder + .toString() + "\n" + "reservationsContinueLooking = " + + reservationsContinueLooking + "\n" + "preemptionDisabled = " + + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = " + + defaultAppPriorityPerQueue); + } finally { + writeLock.unlock(); } - - defaultAppPriorityPerQueue = Priority.newInstance(conf - .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); - - LOG.info("Initializing " + queueName + "\n" + - "capacity = " + queueCapacities.getCapacity() + - " [= (float) configuredCapacity / 100 ]" + "\n" + - "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity() + - " [= parentAbsoluteCapacity * capacity ]" + "\n" + - "maxCapacity = " + queueCapacities.getMaximumCapacity() + - " [= configuredMaxCapacity ]" + "\n" + - "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() + - " [= 1.0 maximumCapacity undefined, " + - "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + - "\n" + - "userLimit = " + userLimit + - " [= configuredUserLimit ]" + "\n" + - "userLimitFactor = " + userLimitFactor + - " [= configuredUserLimitFactor ]" + "\n" + - "maxApplications = " + maxApplications + - " [= configuredMaximumSystemApplicationsPerQueue or" + - " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + - "\n" + - "maxApplicationsPerUser = " + maxApplicationsPerUser + - " [= (int)(maxApplications * (userLimit / 100.0f) * " + - "userLimitFactor) ]" + "\n" + - "usedCapacity = " + queueCapacities.getUsedCapacity() + - " [= usedResourcesMemory / " + - "(clusterResourceMemory * absoluteCapacity)]" + "\n" + - "absoluteUsedCapacity = " + absoluteUsedCapacity + - " [= usedResourcesMemory / clusterResourceMemory]" + "\n" + - "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent + - " [= configuredMaximumAMResourcePercent ]" + "\n" + - "minimumAllocationFactor = " + minimumAllocationFactor + - " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + - "maximumAllocationMemory ]" + "\n" + - "maximumAllocation = " + maximumAllocation + - " [= configuredMaxAllocation ]" + "\n" + - "numContainers = " + numContainers + - " [= currentNumContainers ]" + "\n" + - "state = " + state + - " [= configuredState ]" + "\n" + - "acls = " + aclsString + - " [= configuredAcls ]" + "\n" + - "nodeLocalityDelay = " + nodeLocalityDelay + "\n" + - "labels=" + labelStrBuilder.toString() + "\n" + - "reservationsContinueLooking = " + - reservationsContinueLooking + "\n" + - "preemptionDisabled = " + getPreemptionDisabled() + "\n" + - "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue); } @Override @@ -307,7 +308,7 @@ public class LeafQueue extends AbstractCSQueue { return maxApplications; } - public synchronized int getMaxApplicationsPerUser() { + public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } @@ -325,7 +326,8 @@ public class LeafQueue extends AbstractCSQueue { * Set user limit - used only for testing. * @param userLimit new user limit */ - synchronized void setUserLimit(int userLimit) { + @VisibleForTesting + void setUserLimit(int userLimit) { this.userLimit = userLimit; } @@ -333,50 +335,74 @@ public class LeafQueue extends AbstractCSQueue { * Set user limit factor - used only for testing. * @param userLimitFactor new user limit factor */ - synchronized void setUserLimitFactor(float userLimitFactor) { + @VisibleForTesting + void setUserLimitFactor(float userLimitFactor) { this.userLimitFactor = userLimitFactor; } @Override - public synchronized int getNumApplications() { - return getNumPendingApplications() + getNumActiveApplications(); + public int getNumApplications() { + try { + readLock.lock(); + return getNumPendingApplications() + getNumActiveApplications(); + } finally { + readLock.unlock(); + } } - public synchronized int getNumPendingApplications() { - return pendingOrderingPolicy.getNumSchedulableEntities(); + public int getNumPendingApplications() { + try { + readLock.lock(); + return pendingOrderingPolicy.getNumSchedulableEntities(); + } finally { + readLock.unlock(); + } } - public synchronized int getNumActiveApplications() { - return orderingPolicy.getNumSchedulableEntities(); + public int getNumActiveApplications() { + try { + readLock.lock(); + return orderingPolicy.getNumSchedulableEntities(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getNumApplications(String user) { - return getUser(user).getTotalApplications(); + public int getNumPendingApplications(String user) { + try { + readLock.lock(); + User u = getUser(user); + if (null == u) { + return 0; + } + return u.getPendingApplications(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getNumPendingApplications(String user) { - return getUser(user).getPendingApplications(); + public int getNumActiveApplications(String user) { + try { + readLock.lock(); + User u = getUser(user); + if (null == u) { + return 0; + } + return u.getActiveApplications(); + } finally { + readLock.unlock(); + } } @Private - public synchronized int getNumActiveApplications(String user) { - return getUser(user).getActiveApplications(); - } - - @Override - public synchronized QueueState getState() { - return state; - } - - @Private - public synchronized int getUserLimit() { + public int getUserLimit() { return userLimit; } @Private - public synchronized float getUserLimitFactor() { + public float getUserLimitFactor() { return userLimitFactor; } @@ -388,112 +414,145 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized List + public List getQueueUserAclInfo(UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); - for (QueueACL operation : QueueACL.values()) { - if (hasAccess(operation, user)) { - operations.add(operation); + try { + readLock.lock(); + QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( + QueueUserACLInfo.class); + List operations = new ArrayList<>(); + for (QueueACL operation : QueueACL.values()) { + if (hasAccess(operation, user)) { + operations.add(operation); + } } + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return Collections.singletonList(userAclInfo); + } finally { + readLock.unlock(); } - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return Collections.singletonList(userAclInfo); } public String toString() { - return queueName + ": " + - "capacity=" + queueCapacities.getCapacity() + ", " + - "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + - "usedResources=" + queueUsage.getUsed() + ", " + - "usedCapacity=" + getUsedCapacity() + ", " + - "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + - "numApps=" + getNumApplications() + ", " + - "numContainers=" + getNumContainers(); - } - - @VisibleForTesting - public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) { - this.labelManager = mgr; + try { + readLock.lock(); + return queueName + ": " + "capacity=" + queueCapacities.getCapacity() + + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + + ", " + "usedResources=" + queueUsage.getUsed() + ", " + + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + + ", " + "numContainers=" + getNumContainers(); + } finally { + readLock.unlock(); + } + } @VisibleForTesting - public synchronized User getUser(String userName) { - User user = users.get(userName); - if (user == null) { - user = new User(); - users.put(userName, user); + public User getUser(String userName) { + return users.get(userName); + } + + // Get and add user if absent + private User getUserAndAddIfAbsent(String userName) { + try { + writeLock.lock(); + User u = users.get(userName); + if (null == u) { + u = new User(); + users.put(userName, u); + } + return u; + } finally { + writeLock.unlock(); } - return user; } /** * @return an ArrayList of UserInfo objects who are active in this queue */ - public synchronized ArrayList getUsers() { - ArrayList usersToReturn = new ArrayList(); - for (Map.Entry entry : users.entrySet()) { - User user = entry.getValue(); - usersToReturn.add(new UserInfo(entry.getKey(), Resources.clone(user - .getAllUsed()), user.getActiveApplications(), user - .getPendingApplications(), Resources.clone(user - .getConsumedAMResources()), Resources.clone(user - .getUserResourceLimit()), user.getResourceUsage())); + public ArrayList getUsers() { + try { + readLock.lock(); + ArrayList usersToReturn = new ArrayList(); + for (Map.Entry entry : users.entrySet()) { + User user = entry.getValue(); + usersToReturn.add( + new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), + user.getActiveApplications(), user.getPendingApplications(), + Resources.clone(user.getConsumedAMResources()), + Resources.clone(user.getUserResourceLimit()), + user.getResourceUsage())); + } + return usersToReturn; + } finally { + readLock.unlock(); } - return usersToReturn; } @Override - public synchronized void reinitialize( + public void reinitialize( CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof LeafQueue) || - !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + newlyParsedQueue.getQueuePath()); + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + + LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue; + + // don't allow the maximum allocation to be decreased in size + // since we have already told running AM's the size + Resource oldMax = getMaximumAllocation(); + Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); + if (newMax.getMemorySize() < oldMax.getMemorySize() + || newMax.getVirtualCores() < oldMax.getVirtualCores()) { + throw new IOException("Trying to reinitialize " + getQueuePath() + + " the maximum allocation size can not be decreased!" + + " Current setting: " + oldMax + ", trying to set it to: " + + newMax); + } + + setupQueueConfigs(clusterResource); + + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); + } finally { + writeLock.unlock(); } - - LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; - - // don't allow the maximum allocation to be decreased in size - // since we have already told running AM's the size - Resource oldMax = getMaximumAllocation(); - Resource newMax = newlyParsedLeafQueue.getMaximumAllocation(); - if (newMax.getMemorySize() < oldMax.getMemorySize() - || newMax.getVirtualCores() < oldMax.getVirtualCores()) { - throw new IOException( - "Trying to reinitialize " - + getQueuePath() - + " the maximum allocation size can not be decreased!" - + " Current setting: " + oldMax - + ", trying to set it to: " + newMax); - } - - setupQueueConfigs(clusterResource); - - // queue metrics are updated, more resource may be available - // activate the pending applications if possible - activateApplications(); } @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { // Careful! Locking order is important! - synchronized (this) { - User user = getUser(userName); + try { + writeLock.lock(); + + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); + // Add the attempt to our data-structures addApplicationAttempt(application, user); + } finally { + writeLock.unlock(); } // We don't want to update metrics for move app if (application.isPending()) { metrics.submitAppAttempt(userName); } + getParent().submitApplicationAttempt(application, userName); } @@ -501,37 +560,38 @@ public class LeafQueue extends AbstractCSQueue { public void submitApplication(ApplicationId applicationId, String userName, String queue) throws AccessControlException { // Careful! Locking order is important! - - User user = null; - synchronized (this) { - + try { + writeLock.lock(); // Check if the queue is accepting jobs if (getState() != QueueState.RUNNING) { - String msg = "Queue " + getQueuePath() + - " is STOPPED. Cannot accept submission of application: " + applicationId; + String msg = "Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId; LOG.info(msg); throw new AccessControlException(msg); } // Check submission limits for queues if (getNumApplications() >= getMaxApplications()) { - String msg = "Queue " + getQueuePath() + - " already has " + getNumApplications() + " applications," + - " cannot accept submission of application: " + applicationId; + String msg = + "Queue " + getQueuePath() + " already has " + getNumApplications() + + " applications," + + " cannot accept submission of application: " + applicationId; LOG.info(msg); throw new AccessControlException(msg); } // Check submission limits for the user on this queue - user = getUser(userName); + User user = getUserAndAddIfAbsent(userName); if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { - String msg = "Queue " + getQueuePath() + - " already has " + user.getTotalApplications() + - " applications from user " + userName + - " cannot accept submission of application: " + applicationId; + String msg = "Queue " + getQueuePath() + " already has " + user + .getTotalApplications() + " applications from user " + userName + + " cannot accept submission of application: " + applicationId; LOG.info(msg); throw new AccessControlException(msg); } + } finally { + writeLock.unlock(); } // Inform the parent queue @@ -553,214 +613,237 @@ public class LeafQueue extends AbstractCSQueue { return queueUsage.getAMLimit(nodePartition); } - public synchronized Resource calculateAndGetAMResourceLimit() { + @VisibleForTesting + public Resource calculateAndGetAMResourceLimit() { return calculateAndGetAMResourceLimitPerPartition( RMNodeLabelsManager.NO_LABEL); } @VisibleForTesting - public synchronized Resource getUserAMResourceLimit() { + public Resource getUserAMResourceLimit() { return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); } - public synchronized Resource getUserAMResourceLimitPerPartition( + public Resource getUserAMResourceLimitPerPartition( String nodePartition) { - /* - * The user am resource limit is based on the same approach as the user - * limit (as it should represent a subset of that). This means that it uses - * the absolute queue capacity (per partition) instead of the max and is - * modified by the userlimit and the userlimit factor as is the userlimit - */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, - 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + try { + readLock.lock(); + /* + * The user am resource limit is based on the same approach as the user + * limit (as it should represent a subset of that). This means that it uses + * the absolute queue capacity (per partition) instead of the max and is + * modified by the userlimit and the userlimit factor as is the userlimit + */ + float effectiveUserLimit = Math.max(userLimit / 100.0f, + 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); - Resource userAMLimit = Resources.multiplyAndNormalizeUp(resourceCalculator, - queuePartitionResource, - queueCapacities.getMaxAMResourcePercentage(nodePartition) - * effectiveUserLimit * userLimitFactor, minimumAllocation); - return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAMLimit, getAMResourceLimitPerPartition(nodePartition)) - ? userAMLimit - : getAMResourceLimitPerPartition(nodePartition); - } - - public synchronized Resource calculateAndGetAMResourceLimitPerPartition( - String nodePartition) { - /* - * For non-labeled partition, get the max value from resources currently - * available to the queue and the absolute resources guaranteed for the - * partition in the queue. For labeled partition, consider only the absolute - * resources guaranteed. Multiply this value (based on labeled/ - * non-labeled), * with per-partition am-resource-percent to get the max am - * resource limit for this queue and partition. - */ - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), minimumAllocation); - - Resource queueCurrentLimit = Resources.none(); - // For non-labeled partition, we need to consider the current queue - // usage limit. - if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - synchronized (queueResourceLimitsInfo) { - queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); - } + Resource userAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition) + * effectiveUserLimit * userLimitFactor, minimumAllocation); + return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? + userAMLimit : + getAMResourceLimitPerPartition(nodePartition); + } finally { + readLock.unlock(); } - float amResourcePercent = queueCapacities - .getMaxAMResourcePercentage(nodePartition); - - // Current usable resource for this queue and partition is the max of - // queueCurrentLimit and queuePartitionResource. - Resource queuePartitionUsableResource = Resources.max(resourceCalculator, - lastClusterResource, queueCurrentLimit, queuePartitionResource); - - Resource amResouceLimit = Resources.multiplyAndNormalizeUp( - resourceCalculator, queuePartitionUsableResource, amResourcePercent, - minimumAllocation); - - metrics.setAMResouceLimit(amResouceLimit); - queueUsage.setAMLimit(nodePartition, amResouceLimit); - return amResouceLimit; } - private synchronized void activateApplications() { - // limit of allowed resource usage for application masters - Map userAmPartitionLimit = - new HashMap(); + public Resource calculateAndGetAMResourceLimitPerPartition( + String nodePartition) { + try { + writeLock.lock(); + /* + * For non-labeled partition, get the max value from resources currently + * available to the queue and the absolute resources guaranteed for the + * partition in the queue. For labeled partition, consider only the absolute + * resources guaranteed. Multiply this value (based on labeled/ + * non-labeled), * with per-partition am-resource-percent to get the max am + * resource limit for this queue and partition. + */ + Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( + resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); - // AM Resource Limit for accessible labels can be pre-calculated. - // This will help in updating AMResourceLimit for all labels when queue - // is initialized for the first time (when no applications are present). - for (String nodePartition : getNodeLabelsForQueue()) { - calculateAndGetAMResourceLimitPerPartition(nodePartition); - } - - for (Iterator fsApp = - getPendingAppsOrderingPolicy().getAssignmentIterator(); - fsApp.hasNext();) { - FiCaSchedulerApp application = fsApp.next(); - ApplicationId applicationId = application.getApplicationId(); - - // Get the am-node-partition associated with each application - // and calculate max-am resource limit for this partition. - String partitionName = application.getAppAMNodePartitionName(); - - Resource amLimit = getAMResourceLimitPerPartition(partitionName); - // Verify whether we already calculated am-limit for this label. - if (amLimit == null) { - amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); - } - // Check am resource limit. - Resource amIfStarted = Resources.add( - application.getAMResource(partitionName), - queueUsage.getAMUsed(partitionName)); - - if (LOG.isDebugEnabled()) { - LOG.debug("application "+application.getId() +" AMResource " - + application.getAMResource(partitionName) - + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent - + " amLimit " + amLimit + " lastClusterResource " - + lastClusterResource + " amIfStarted " + amIfStarted - + " AM node-partition name " + partitionName); - } - - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - amIfStarted, amLimit)) { - if (getNumActiveApplications() < 1 - || (Resources.lessThanOrEqual(resourceCalculator, - lastClusterResource, queueUsage.getAMUsed(partitionName), - Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue, it is likely set too low." - + " skipping enforcement to allow at least one application" - + " to start"); - } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); - LOG.info("Not activating application " + applicationId - + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " - + amLimit); - continue; + Resource queueCurrentLimit = Resources.none(); + // For non-labeled partition, we need to consider the current queue + // usage limit. + if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + synchronized (queueResourceLimitsInfo){ + queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit(); } } - // Check user am resource limit - User user = getUser(application.getUser()); - Resource userAMLimit = userAmPartitionLimit.get(partitionName); + float amResourcePercent = queueCapacities.getMaxAMResourcePercentage( + nodePartition); - // Verify whether we already calculated user-am-limit for this label. - if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName); - userAmPartitionLimit.put(partitionName, userAMLimit); + // Current usable resource for this queue and partition is the max of + // queueCurrentLimit and queuePartitionResource. + Resource queuePartitionUsableResource = Resources.max(resourceCalculator, + lastClusterResource, queueCurrentLimit, queuePartitionResource); + + Resource amResouceLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionUsableResource, amResourcePercent, + minimumAllocation); + + metrics.setAMResouceLimit(amResouceLimit); + queueUsage.setAMLimit(nodePartition, amResouceLimit); + return amResouceLimit; + } finally { + writeLock.unlock(); + } + } + + private void activateApplications() { + try { + writeLock.lock(); + // limit of allowed resource usage for application masters + Map userAmPartitionLimit = + new HashMap(); + + // AM Resource Limit for accessible labels can be pre-calculated. + // This will help in updating AMResourceLimit for all labels when queue + // is initialized for the first time (when no applications are present). + for (String nodePartition : getNodeLabelsForQueue()) { + calculateAndGetAMResourceLimitPerPartition(nodePartition); } - Resource userAmIfStarted = Resources.add( - application.getAMResource(partitionName), - user.getConsumedAMResources(partitionName)); + for (Iterator fsApp = + getPendingAppsOrderingPolicy().getAssignmentIterator(); + fsApp.hasNext(); ) { + FiCaSchedulerApp application = fsApp.next(); + ApplicationId applicationId = application.getApplicationId(); - if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, - userAmIfStarted, userAMLimit)) { - if (getNumActiveApplications() < 1 - || (Resources.lessThanOrEqual(resourceCalculator, - lastClusterResource, queueUsage.getAMUsed(partitionName), - Resources.none()))) { - LOG.warn("maximum-am-resource-percent is insufficient to start a" - + " single application in queue for user, it is likely set too" - + " low. skipping enforcement to allow at least one application" - + " to start"); - } else { - application.updateAMContainerDiagnostics(AMState.INACTIVATED, - CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); - LOG.info("Not activating application " + applicationId - + " for user: " + user + " as userAmIfStarted: " - + userAmIfStarted + " exceeds userAmLimit: " + userAMLimit); - continue; + // Get the am-node-partition associated with each application + // and calculate max-am resource limit for this partition. + String partitionName = application.getAppAMNodePartitionName(); + + Resource amLimit = getAMResourceLimitPerPartition(partitionName); + // Verify whether we already calculated am-limit for this label. + if (amLimit == null) { + amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName); } - } - user.activateApplication(); - orderingPolicy.addSchedulableEntity(application); - application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); + // Check am resource limit. + Resource amIfStarted = Resources.add( + application.getAMResource(partitionName), + queueUsage.getAMUsed(partitionName)); - queueUsage.incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().incAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().setAMLimit(partitionName, userAMLimit); - metrics.incAMUsed(application.getUser(), - application.getAMResource(partitionName)); - metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); - fsApp.remove(); - LOG.info("Application " + applicationId + " from user: " - + application.getUser() + " activated in queue: " + getQueueName()); + if (LOG.isDebugEnabled()) { + LOG.debug("application " + application.getId() + " AMResource " + + application.getAMResource(partitionName) + + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent + + " amLimit " + amLimit + " lastClusterResource " + + lastClusterResource + " amIfStarted " + amIfStarted + + " AM node-partition name " + partitionName); + } + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + amIfStarted, amLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue, it is likely set too low." + + " skipping enforcement to allow at least one application" + + " to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED); + LOG.info("Not activating application " + applicationId + + " as amIfStarted: " + amIfStarted + " exceeds amLimit: " + + amLimit); + continue; + } + } + + // Check user am resource limit + User user = getUser(application.getUser()); + Resource userAMLimit = userAmPartitionLimit.get(partitionName); + + // Verify whether we already calculated user-am-limit for this label. + if (userAMLimit == null) { + userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAmPartitionLimit.put(partitionName, userAMLimit); + } + + Resource userAmIfStarted = Resources.add( + application.getAMResource(partitionName), + user.getConsumedAMResources(partitionName)); + + if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, + userAmIfStarted, userAMLimit)) { + if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( + resourceCalculator, lastClusterResource, + queueUsage.getAMUsed(partitionName), Resources.none()))) { + LOG.warn("maximum-am-resource-percent is insufficient to start a" + + " single application in queue for user, it is likely set too" + + " low. skipping enforcement to allow at least one application" + + " to start"); + } else{ + application.updateAMContainerDiagnostics(AMState.INACTIVATED, + CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED); + LOG.info( + "Not activating application " + applicationId + " for user: " + + user + " as userAmIfStarted: " + userAmIfStarted + + " exceeds userAmLimit: " + userAMLimit); + continue; + } + } + user.activateApplication(); + orderingPolicy.addSchedulableEntity(application); + application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); + + queueUsage.incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().incAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().setAMLimit(partitionName, userAMLimit); + metrics.incAMUsed(application.getUser(), + application.getAMResource(partitionName)); + metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); + fsApp.remove(); + LOG.info("Application " + applicationId + " from user: " + application + .getUser() + " activated in queue: " + getQueueName()); + } + } finally { + writeLock.unlock(); } } - private synchronized void addApplicationAttempt(FiCaSchedulerApp application, + private void addApplicationAttempt(FiCaSchedulerApp application, User user) { - // Accept - user.submitApplication(); - getPendingAppsOrderingPolicy().addSchedulableEntity(application); - applicationAttemptMap.put(application.getApplicationAttemptId(), application); + try { + writeLock.lock(); + // Accept + user.submitApplication(); + getPendingAppsOrderingPolicy().addSchedulableEntity(application); + applicationAttemptMap.put(application.getApplicationAttemptId(), + application); - // Activate applications - activateApplications(); - - LOG.info("Application added -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + "," + - " leaf-queue: " + getQueueName() + - " #user-pending-applications: " + user.getPendingApplications() + - " #user-active-applications: " + user.getActiveApplications() + - " #queue-pending-applications: " + getNumPendingApplications() + - " #queue-active-applications: " + getNumActiveApplications() - ); + // Activate applications + activateApplications(); + + LOG.info( + "Application added -" + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + "," + " leaf-queue: " + + getQueueName() + " #user-pending-applications: " + user + .getPendingApplications() + " #user-active-applications: " + user + .getActiveApplications() + " #queue-pending-applications: " + + getNumPendingApplications() + " #queue-active-applications: " + + getNumActiveApplications()); + } finally { + writeLock.unlock(); + } } @Override @@ -774,49 +857,54 @@ public class LeafQueue extends AbstractCSQueue { @Override public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) { // Careful! Locking order is important! - synchronized (this) { - removeApplicationAttempt(application, getUser(application.getUser())); - } + removeApplicationAttempt(application, application.getUser()); getParent().finishApplicationAttempt(application, queue); } - public synchronized void removeApplicationAttempt( - FiCaSchedulerApp application, User user) { - String partitionName = application.getAppAMNodePartitionName(); - boolean wasActive = - orderingPolicy.removeSchedulableEntity(application); - if (!wasActive) { - pendingOrderingPolicy.removeSchedulableEntity(application); - } else { - queueUsage.decAMUsed(partitionName, - application.getAMResource(partitionName)); - user.getResourceUsage().decAMUsed(partitionName, - application.getAMResource(partitionName)); - metrics.decAMUsed(application.getUser(), - application.getAMResource(partitionName)); + private void removeApplicationAttempt( + FiCaSchedulerApp application, String userName) { + try { + writeLock.lock(); + + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); + + String partitionName = application.getAppAMNodePartitionName(); + boolean wasActive = orderingPolicy.removeSchedulableEntity(application); + if (!wasActive) { + pendingOrderingPolicy.removeSchedulableEntity(application); + } else{ + queueUsage.decAMUsed(partitionName, + application.getAMResource(partitionName)); + user.getResourceUsage().decAMUsed(partitionName, + application.getAMResource(partitionName)); + metrics.decAMUsed(application.getUser(), + application.getAMResource(partitionName)); + } + applicationAttemptMap.remove(application.getApplicationAttemptId()); + + user.finishApplication(wasActive); + if (user.getTotalApplications() == 0) { + users.remove(application.getUser()); + } + + // Check if we can activate more applications + activateApplications(); + + LOG.info( + "Application removed -" + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + " queue: " + getQueueName() + + " #user-pending-applications: " + user.getPendingApplications() + + " #user-active-applications: " + user.getActiveApplications() + + " #queue-pending-applications: " + getNumPendingApplications() + + " #queue-active-applications: " + getNumActiveApplications()); + } finally { + writeLock.unlock(); } - applicationAttemptMap.remove(application.getApplicationAttemptId()); - - user.finishApplication(wasActive); - if (user.getTotalApplications() == 0) { - users.remove(application.getUser()); - } - - // Check if we can activate more applications - activateApplications(); - - LOG.info("Application removed -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + - " queue: " + getQueueName() + - " #user-pending-applications: " + user.getPendingApplications() + - " #user-active-applications: " + user.getActiveApplications() + - " #queue-pending-applications: " + getNumPendingApplications() + - " #queue-active-applications: " + getNumActiveApplications() - ); } - private synchronized FiCaSchedulerApp getApplication( + private FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applicationAttemptMap.get(applicationAttemptId); } @@ -878,170 +966,171 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); + try { + writeLock.lock(); + updateCurrentResourceLimits(currentResourceLimits, clusterResource); - if (LOG.isDebugEnabled()) { - LOG.debug("assignContainers: node=" + node.getNodeName() - + " #applications=" + orderingPolicy.getNumSchedulableEntities()); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "assignContainers: node=" + node.getNodeName() + " #applications=" + + orderingPolicy.getNumSchedulableEntities()); + } - setPreemptionAllowed(currentResourceLimits, node.getPartition()); + setPreemptionAllowed(currentResourceLimits, node.getPartition()); - // Check for reserved resources - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FiCaSchedulerApp application = - getApplication(reservedContainer.getApplicationAttemptId()); + // Check for reserved resources + RMContainer reservedContainer = node.getReservedContainer(); + if (reservedContainer != null) { + FiCaSchedulerApp application = getApplication( + reservedContainer.getApplicationAttemptId()); - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); - synchronized (application) { - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, reservedContainer); + CSAssignment assignment = application.assignContainers(clusterResource, + node, currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment, node, application); killToPreemptContainers(clusterResource, node, assignment); return assignment; } - } - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + return CSAssignment.NULL_ASSIGNMENT; + } + + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!hasPendingResourceRequest(node.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node .getPartition()); - return CSAssignment.NULL_ASSIGNMENT; - } + } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + return CSAssignment.NULL_ASSIGNMENT; + } - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!hasPendingResourceRequest(node.getPartition(), clusterResource, - schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); + for (Iterator assignmentIterator = + orderingPolicy.getAssignmentIterator(); + assignmentIterator.hasNext(); ) { + FiCaSchedulerApp application = assignmentIterator.next(); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + + // Check queue max-capacity limit + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, application.getCurrentReservation(), + schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } + + Resource userLimit = computeUserLimitAndSetHeadroom(application, + clusterResource, node.getPartition(), schedulingMode); + + // Check user limit + if (!canAssignToUser(clusterResource, application.getUser(), userLimit, + application, node.getPartition(), currentResourceLimits)) { + application.updateAMContainerDiagnostics(AMState.ACTIVATED, + "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); + continue; + } + + // Try to schedule + CSAssignment assignment = application.assignContainers(clusterResource, + node, currentResourceLimits, schedulingMode, null); + + if (LOG.isDebugEnabled()) { + LOG.debug("post-assignContainers for application " + application + .getApplicationId()); + application.showRequests(); + } + + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); + + handleExcessReservedContainer(clusterResource, assignment, node, + application); + killToPreemptContainers(clusterResource, node, assignment); + + if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, + Resources.none())) { + // Get reserved or allocated container from application + RMContainer reservedOrAllocatedRMContainer = + application.getRMContainer(assignment.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId()); + + // Book-keeping + // Note: Update headroom to account for current allocation too... + allocateResource(clusterResource, application, assigned, + node.getPartition(), reservedOrAllocatedRMContainer, + assignment.isIncreasedAllocation()); + + // Update reserved metrics + Resource reservedRes = + assignment.getAssignmentInformation().getReserved(); + if (reservedRes != null && !reservedRes.equals(Resources.none())) { + incReservedResource(node.getPartition(), reservedRes); + } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + + // Done + return assignment; + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + application.updateNodeInfoForAMDiagnostics(node); + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; + } else{ + // If we don't allocate anything, and it is not skipped by application, + // we will return to respect FIFO of applications + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } finally { + writeLock.unlock(); } - - for (Iterator assignmentIterator = - orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { - FiCaSchedulerApp application = assignmentIterator.next(); - - ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, - node.getNodeID(), SystemClock.getInstance().getTime(), application); - - // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, - application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; - } - - Resource userLimit = - computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { - application.updateAMContainerDiagnostics(AMState.ACTIVATED, - "User capacity has reached its maximum limit."); - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, - application, application.getPriority(), - ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); - continue; - } - - // Try to schedule - CSAssignment assignment = - application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode, null); - - if (LOG.isDebugEnabled()) { - LOG.debug("post-assignContainers for application " - + application.getApplicationId()); - application.showRequests(); - } - - // Did we schedule or reserve a container? - Resource assigned = assignment.getResource(); - - handleExcessReservedContainer(clusterResource, assignment, node, - application); - killToPreemptContainers(clusterResource, node, assignment); - - if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, - Resources.none())) { - // Get reserved or allocated container from application - RMContainer reservedOrAllocatedRMContainer = - application.getRMContainer(assignment.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId()); - - // Book-keeping - // Note: Update headroom to account for current allocation too... - allocateResource(clusterResource, application, assigned, - node.getPartition(), reservedOrAllocatedRMContainer, - assignment.isIncreasedAllocation()); - - // Update reserved metrics - Resource reservedRes = assignment.getAssignmentInformation() - .getReserved(); - if (reservedRes != null && !reservedRes.equals(Resources.none())) { - incReservedResource(node.getPartition(), reservedRes); - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); - - // Done - return assignment; - } else if (assignment.getSkippedType() - == CSAssignment.SkippedType.OTHER) { - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - application.updateNodeInfoForAMDiagnostics(node); - } else if(assignment.getSkippedType() - == CSAssignment.SkippedType.QUEUE_LIMIT) { - return assignment; - } else { - // If we don't allocate anything, and it is not skipped by application, - // we will return to respect FIFO of applications - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.RESPECT_FIFO); - ActivitiesLogger.APP.finishSkippedAppAllocationRecording( - activitiesManager, application.getApplicationId(), - ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; - } - } - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - - return CSAssignment.NULL_ASSIGNMENT; } protected Resource getHeadroom(User user, Resource queueCurrentLimit, @@ -1116,7 +1205,8 @@ public class LeafQueue extends AbstractCSQueue { } } - @Lock({LeafQueue.class, FiCaSchedulerApp.class}) + // It doesn't necessarily to hold application's lock here. + @Lock({LeafQueue.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode) { @@ -1288,51 +1378,53 @@ public class LeafQueue extends AbstractCSQueue { } @Private - protected synchronized boolean canAssignToUser(Resource clusterResource, + protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, String nodePartition, ResourceLimits currentResourceLimits) { - User user = getUser(userName); + try { + readLock.lock(); + User user = getUser(userName); - currentResourceLimits.setAmountNeededUnreserve(Resources.none()); + currentResourceLimits.setAmountNeededUnreserve(Resources.none()); - // Note: We aren't considering the current request since there is a fixed - // overhead of the AM, but it's a > check, not a >= check, so... - if (Resources - .greaterThan(resourceCalculator, clusterResource, - user.getUsed(nodePartition), - limit)) { - // if enabled, check to see if could we potentially use this node instead - // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && - nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { - if (Resources.lessThanOrEqual( - resourceCalculator, - clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + // Note: We aren't considering the current request since there is a fixed + // overhead of the AM, but it's a > check, not a >= check, so... + if (Resources.greaterThan(resourceCalculator, clusterResource, + user.getUsed(nodePartition), limit)) { + // if enabled, check to see if could we potentially use this node instead + // of a reserved node if the application has reserved containers + if (this.reservationsContinueLooking && nodePartition.equals( + CommonNodeLabelsManager.NO_LABEL)) { + if (Resources.lessThanOrEqual(resourceCalculator, clusterResource, + Resources.subtract(user.getUsed(), + application.getCurrentReservation()), limit)) { - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit based on reservations - " + " consumed: " - + user.getUsed() + " reserved: " - + application.getCurrentReservation() + " limit: " + limit); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit based on reservations - " + + " consumed: " + user.getUsed() + " reserved: " + application + .getCurrentReservation() + " limit: " + limit); + } + Resource amountNeededToUnreserve = Resources.subtract( + user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first to + // respect user-limit + currentResourceLimits.setAmountNeededUnreserve( + amountNeededToUnreserve); + return true; } - Resource amountNeededToUnreserve = - Resources.subtract(user.getUsed(nodePartition), limit); - // we can only acquire a new container if we unreserve first to - // respect user-limit - currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve); - return true; } + if (LOG.isDebugEnabled()) { + LOG.debug("User " + userName + " in queue " + getQueueName() + + " will exceed limit - " + " consumed: " + user + .getUsed(nodePartition) + " limit: " + limit); + } + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("User " + userName + " in queue " + getQueueName() - + " will exceed limit - " + " consumed: " - + user.getUsed(nodePartition) + " limit: " + limit); - } - return false; + return true; + } finally { + readLock.unlock(); } - return true; } @Override @@ -1340,15 +1432,15 @@ public class LeafQueue extends AbstractCSQueue { FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer) { boolean removed = false; Priority priority = null; - - synchronized (this) { + + try { + writeLock.lock(); if (rmContainer.getContainer() != null) { priority = rmContainer.getContainer().getPriority(); } if (null != priority) { - removed = app.unreserve( - rmContainer.getAllocatedSchedulerKey(), node, + removed = app.unreserve(rmContainer.getAllocatedSchedulerKey(), node, rmContainer); } @@ -1359,8 +1451,10 @@ public class LeafQueue extends AbstractCSQueue { releaseResource(clusterResource, app, rmContainer.getReservedResource(), node.getPartition(), rmContainer, true); } + } finally { + writeLock.unlock(); } - + if (removed) { getParent().unreserveIncreasedContainer(clusterResource, app, node, rmContainer); @@ -1387,42 +1481,52 @@ public class LeafQueue extends AbstractCSQueue { } } - private synchronized float calculateUserUsageRatio(Resource clusterResource, + private float calculateUserUsageRatio(Resource clusterResource, String nodePartition) { - Resource resourceByLabel = - labelManager.getResourceByLabel(nodePartition, clusterResource); - float consumed = 0; - User user; - for (Map.Entry entry : users.entrySet()) { - user = entry.getValue(); - consumed += user.resetAndUpdateUsageRatio(resourceCalculator, - resourceByLabel, nodePartition); - } - return consumed; - } - - private synchronized void recalculateQueueUsageRatio(Resource clusterResource, - String nodePartition) { - ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); - - if (nodePartition == null) { - for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(), - queueResourceUsage.getNodePartitionsSet())) { - qUsageRatios.setUsageRatio(partition, - calculateUserUsageRatio(clusterResource, partition)); + try { + writeLock.lock(); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + float consumed = 0; + User user; + for (Map.Entry entry : users.entrySet()) { + user = entry.getValue(); + consumed += user.resetAndUpdateUsageRatio(resourceCalculator, + resourceByLabel, nodePartition); } - } else { - qUsageRatios.setUsageRatio(nodePartition, - calculateUserUsageRatio(clusterResource, nodePartition)); + return consumed; + } finally { + writeLock.unlock(); } } - private synchronized void updateQueueUsageRatio(String nodePartition, + private void recalculateQueueUsageRatio(Resource clusterResource, + String nodePartition) { + try { + writeLock.lock(); + ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union( + queueCapacities.getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + qUsageRatios.setUsageRatio(partition, + calculateUserUsageRatio(clusterResource, partition)); + } + } else{ + qUsageRatios.setUsageRatio(nodePartition, + calculateUserUsageRatio(clusterResource, nodePartition)); + } + } finally { + writeLock.unlock(); + } + } + + private void updateQueueUsageRatio(String nodePartition, float delta) { qUsageRatios.incUsageRatio(nodePartition, delta); } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1445,21 +1549,20 @@ public class LeafQueue extends AbstractCSQueue { boolean removed = false; // Careful! Locking order is important! - synchronized (this) { - + try { + writeLock.lock(); Container container = rmContainer.getContainer(); // Inform the application & the node // Note: It's safe to assume that all state changes to RMContainer - // happen under scheduler's lock... + // happen under scheduler's lock... // So, this is, in effect, a transaction across application & node if (rmContainer.getState() == RMContainerState.RESERVED) { removed = application.unreserve(rmContainer.getReservedSchedulerKey(), node, rmContainer); - } else { - removed = - application.containerCompleted(rmContainer, containerStatus, - event, node.getPartition()); + } else{ + removed = application.containerCompleted(rmContainer, containerStatus, + event, node.getPartition()); node.releaseContainer(container); } @@ -1469,12 +1572,15 @@ public class LeafQueue extends AbstractCSQueue { // Inform the ordering policy orderingPolicy.containerReleased(application, rmContainer); - + releaseResource(clusterResource, application, container.getResource(), node.getPartition(), rmContainer, false); } + } finally { + writeLock.unlock(); } + if (removed) { // Inform the parent queue _outside_ of the leaf-queue lock getParent().completedContainer(clusterResource, application, node, @@ -1487,91 +1593,104 @@ public class LeafQueue extends AbstractCSQueue { new KillableContainer(rmContainer, node.getPartition(), queueName)); } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application, Resource resource, String nodePartition, RMContainer rmContainer, boolean isIncreasedAllocation) { - super.allocateResource(clusterResource, resource, nodePartition, - isIncreasedAllocation); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // handle ignore exclusivity container - if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL) - && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - TreeSet rmContainers = null; - if (null == (rmContainers = - ignorePartitionExclusivityRMContainers.get(nodePartition))) { - rmContainers = new TreeSet<>(); - ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers); + try { + writeLock.lock(); + super.allocateResource(clusterResource, resource, nodePartition, + isIncreasedAllocation); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( + RMNodeLabelsManager.NO_LABEL)) { + TreeSet rmContainers = null; + if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get( + nodePartition))) { + rmContainers = new TreeSet<>(); + ignorePartitionExclusivityRMContainers.put(nodePartition, + rmContainers); + } + rmContainers.add(rmContainer); } - rmContainers.add(rmContainer); - } - // Update user metrics - String userName = application.getUser(); - User user = getUser(userName); - user.assignContainer(resource, nodePartition); + // Update user metrics + String userName = application.getUser(); - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); - // Note this is a bit unconventional since it gets the object and modifies - // it here, rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + - " user=" + userName + - " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + - " headroom = " + application.getHeadroom() + - " user-resources=" + user.getUsed() - ); + user.assignContainer(resource, nodePartition); + + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); + + // Note this is a bit unconventional since it gets the object and modifies + // it here, rather then using set routine + Resources.subtractFrom(application.getHeadroom(), resource); // headroom + metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage + .getUsed() + " numContainers=" + numContainers + " headroom = " + + application.getHeadroom() + " user-resources=" + user.getUsed()); + } + } finally { + writeLock.unlock(); } } - synchronized void releaseResource(Resource clusterResource, + void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource, String nodePartition, RMContainer rmContainer, boolean isChangeResource) { - super.releaseResource(clusterResource, resource, nodePartition, - isChangeResource); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // handle ignore exclusivity container - if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( - RMNodeLabelsManager.NO_LABEL) - && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { - Set rmContainers = - ignorePartitionExclusivityRMContainers.get(nodePartition); - rmContainers.remove(rmContainer); - if (rmContainers.isEmpty()) { - ignorePartitionExclusivityRMContainers.remove(nodePartition); + try { + writeLock.lock(); + super.releaseResource(clusterResource, resource, nodePartition, + isChangeResource); + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + // handle ignore exclusivity container + if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( + RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals( + RMNodeLabelsManager.NO_LABEL)) { + if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) { + Set rmContainers = + ignorePartitionExclusivityRMContainers.get(nodePartition); + rmContainers.remove(rmContainer); + if (rmContainers.isEmpty()) { + ignorePartitionExclusivityRMContainers.remove(nodePartition); + } } } - } - // Update user metrics - String userName = application.getUser(); - User user = getUser(userName); - user.releaseContainer(resource, nodePartition); + // Update user metrics + String userName = application.getUser(); + User user = getUserAndAddIfAbsent(userName); + user.releaseContainer(resource, nodePartition); - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + // Update usage ratios + updateQueueUsageRatio(nodePartition, + user.updateUsageRatio(resourceCalculator, resourceByLabel, + nodePartition)); - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + - " used=" + queueUsage.getUsed() + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getUsed()); + if (LOG.isDebugEnabled()) { + LOG.debug( + getQueueName() + " used=" + queueUsage.getUsed() + " numContainers=" + + numContainers + " user=" + userName + " user-resources=" + + user.getUsed()); + } + } finally { + writeLock.unlock(); } } @@ -1596,35 +1715,38 @@ public class LeafQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource, + public void updateClusterResource(Resource clusterResource, ResourceLimits currentResourceLimits) { - updateCurrentResourceLimits(currentResourceLimits, clusterResource); - lastClusterResource = clusterResource; - - // Update headroom info based on new cluster resource value - // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity - // during allocation - setQueueResourceLimitsInfo(clusterResource); + try { + writeLock.lock(); + updateCurrentResourceLimits(currentResourceLimits, clusterResource); + lastClusterResource = clusterResource; - // Update user consumedRatios - recalculateQueueUsageRatio(clusterResource, null); + // Update headroom info based on new cluster resource value + // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity + // during allocation + setQueueResourceLimitsInfo(clusterResource); - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); + // Update user consumedRatios + recalculateQueueUsageRatio(clusterResource, null); - // queue metrics are updated, more resource may be available - // activate the pending applications if possible - activateApplications(); + // Update metrics + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); - // Update application properties - for (FiCaSchedulerApp application : - orderingPolicy.getSchedulableEntities()) { - synchronized (application) { + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); + + // Update application properties + for (FiCaSchedulerApp application : orderingPolicy + .getSchedulableEntities()) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); } + } finally { + writeLock.unlock(); } } @@ -1721,30 +1843,47 @@ public class LeafQueue extends AbstractCSQueue { public static class User { ResourceUsage userResourceUsage = new ResourceUsage(); volatile Resource userResourceLimit = Resource.newInstance(0, 0); - int pendingApplications = 0; - int activeApplications = 0; + volatile int pendingApplications = 0; + volatile int activeApplications = 0; private UsageRatios userUsageRatios = new UsageRatios(); + private WriteLock writeLock; + + User() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Nobody uses read-lock now, will add it when necessary + writeLock = lock.writeLock(); + } public ResourceUsage getResourceUsage() { return userResourceUsage; } - public synchronized float resetAndUpdateUsageRatio( + public float resetAndUpdateUsageRatio( ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { - userUsageRatios.setUsageRatio(nodePartition, 0); - return updateUsageRatio(resourceCalculator, resource, nodePartition); + try { + writeLock.lock(); + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } finally { + writeLock.unlock(); + } } - public synchronized float updateUsageRatio( + public float updateUsageRatio( ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { - float delta; - float newRatio = - Resources.ratio(resourceCalculator, getUsed(nodePartition), resource); - delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); - userUsageRatios.setUsageRatio(nodePartition, newRatio); - return delta; + try { + writeLock.lock(); + float delta; + float newRatio = Resources.ratio(resourceCalculator, + getUsed(nodePartition), resource); + delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); + userUsageRatios.setUsageRatio(nodePartition, newRatio); + return delta; + } finally { + writeLock.unlock(); + } } public Resource getUsed() { @@ -1779,21 +1918,35 @@ public class LeafQueue extends AbstractCSQueue { return getPendingApplications() + getActiveApplications(); } - public synchronized void submitApplication() { - ++pendingApplications; + public void submitApplication() { + try { + writeLock.lock(); + ++pendingApplications; + } finally { + writeLock.unlock(); + } } - public synchronized void activateApplication() { - --pendingApplications; - ++activeApplications; + public void activateApplication() { + try { + writeLock.lock(); + --pendingApplications; + ++activeApplications; + } finally { + writeLock.unlock(); + } } - public synchronized void finishApplication(boolean wasActive) { - if (wasActive) { - --activeApplications; - } - else { - --pendingApplications; + public void finishApplication(boolean wasActive) { + try { + writeLock.lock(); + if (wasActive) { + --activeApplications; + } else{ + --pendingApplications; + } + } finally { + writeLock.unlock(); } } @@ -1820,13 +1973,19 @@ public class LeafQueue extends AbstractCSQueue { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - // Careful! Locking order is important! - synchronized (this) { - FiCaSchedulerNode node = - scheduler.getNode(rmContainer.getContainer().getNodeId()); - allocateResource(clusterResource, attempt, rmContainer.getContainer() - .getResource(), node.getPartition(), rmContainer, false); + + // Careful! Locking order is important! + try { + writeLock.lock(); + FiCaSchedulerNode node = scheduler.getNode( + rmContainer.getContainer().getNodeId()); + allocateResource(clusterResource, attempt, + rmContainer.getContainer().getResource(), node.getPartition(), + rmContainer, false); + } finally { + writeLock.unlock(); } + getParent().recoverContainer(clusterResource, attempt, rmContainer); } @@ -1850,44 +2009,56 @@ public class LeafQueue extends AbstractCSQueue { // Total pending for the queue = // sum(for each user(min((user's headroom), sum(user's pending requests)))) // NOTE: Used for calculating pedning resources in the preemption monitor. - public synchronized Resource getTotalPendingResourcesConsideringUserLimit( + public Resource getTotalPendingResourcesConsideringUserLimit( Resource resources, String partition) { - Map userNameToHeadroom = new HashMap(); - Resource pendingConsideringUserLimit = Resource.newInstance(0, 0); - for (FiCaSchedulerApp app : getApplications()) { - String userName = app.getUser(); - if (!userNameToHeadroom.containsKey(userName)) { - User user = getUser(userName); - Resource headroom = Resources.subtract( - computeUserLimit(app, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), - user.getUsed(partition)); - // Make sure headroom is not negative. - headroom = Resources.componentwiseMax(headroom, Resources.none()); - userNameToHeadroom.put(userName, headroom); + try { + readLock.lock(); + Map userNameToHeadroom = + new HashMap<>(); + Resource pendingConsideringUserLimit = Resource.newInstance(0, 0); + for (FiCaSchedulerApp app : getApplications()) { + String userName = app.getUser(); + if (!userNameToHeadroom.containsKey(userName)) { + User user = getUser(userName); + Resource headroom = Resources.subtract( + computeUserLimit(app, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + user.getUsed(partition)); + // Make sure headroom is not negative. + headroom = Resources.componentwiseMax(headroom, Resources.none()); + userNameToHeadroom.put(userName, headroom); + } + Resource minpendingConsideringUserLimit = Resources.componentwiseMin( + userNameToHeadroom.get(userName), + app.getAppAttemptResourceUsage().getPending(partition)); + Resources.addTo(pendingConsideringUserLimit, + minpendingConsideringUserLimit); + Resources.subtractFrom(userNameToHeadroom.get(userName), + minpendingConsideringUserLimit); } - Resource minpendingConsideringUserLimit = - Resources.componentwiseMin(userNameToHeadroom.get(userName), - app.getAppAttemptResourceUsage().getPending(partition)); - Resources.addTo(pendingConsideringUserLimit, - minpendingConsideringUserLimit); - Resources.subtractFrom( - userNameToHeadroom.get(userName), minpendingConsideringUserLimit); + return pendingConsideringUserLimit; + } finally { + readLock.unlock(); } - return pendingConsideringUserLimit; + } @Override - public synchronized void collectSchedulerApplications( + public void collectSchedulerApplications( Collection apps) { - for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy - .getSchedulableEntities()) { - apps.add(pendingApp.getApplicationAttemptId()); - } - for (FiCaSchedulerApp app : - orderingPolicy.getSchedulableEntities()) { - apps.add(app.getApplicationAttemptId()); + try { + readLock.lock(); + for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy + .getSchedulableEntities()) { + apps.add(pendingApp.getApplicationAttemptId()); + } + for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) { + apps.add(app.getApplicationAttemptId()); + } + } finally { + readLock.unlock(); } + } @Override @@ -1928,13 +2099,24 @@ public class LeafQueue extends AbstractCSQueue { /** * return all ignored partition exclusivity RMContainers in the LeafQueue, this - * will be used by preemption policy, and use of return - * ignorePartitionExclusivityRMContainer should protected by LeafQueue - * synchronized lock + * will be used by preemption policy. */ - public synchronized Map> + public Map> getIgnoreExclusivityRMContainers() { - return ignorePartitionExclusivityRMContainers; + Map> clonedMap = new HashMap<>(); + try { + readLock.lock(); + + for (Map.Entry> entry : ignorePartitionExclusivityRMContainers + .entrySet()) { + clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue())); + } + + return clonedMap; + + } finally { + readLock.unlock(); + } } public void setCapacity(float capacity) { @@ -1949,18 +2131,23 @@ public class LeafQueue extends AbstractCSQueue { this.maxApplications = maxApplications; } - public synchronized OrderingPolicy + public OrderingPolicy getOrderingPolicy() { return orderingPolicy; } - public synchronized void setOrderingPolicy( + void setOrderingPolicy( OrderingPolicy orderingPolicy) { - if (null != this.orderingPolicy) { - orderingPolicy.addAllSchedulableEntities(this.orderingPolicy - .getSchedulableEntities()); + try { + writeLock.lock(); + if (null != this.orderingPolicy) { + orderingPolicy.addAllSchedulableEntities( + this.orderingPolicy.getSchedulableEntities()); + } + this.orderingPolicy = orderingPolicy; + } finally { + writeLock.unlock(); } - this.orderingPolicy = orderingPolicy; } @Override @@ -1988,25 +2175,26 @@ public class LeafQueue extends AbstractCSQueue { boolean resourceDecreased = false; Resource resourceBeforeDecrease; // Grab queue lock to avoid race condition when getting container resource - synchronized (this) { + + try { + writeLock.lock(); // Make sure the decrease request is valid in terms of current resource // and target resource. This must be done under the leaf queue lock. // Throws exception if the check fails. RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false); // Save resource before decrease for debug log - resourceBeforeDecrease = - Resources.clone(rmContainer.getAllocatedResource()); + resourceBeforeDecrease = Resources.clone( + rmContainer.getAllocatedResource()); // Do we have increase request for the same container? If so, remove it - boolean hasIncreaseRequest = - app.removeIncreaseRequest(decreaseRequest.getNodeId(), - decreaseRequest.getRMContainer().getAllocatedSchedulerKey(), - decreaseRequest.getContainerId()); + boolean hasIncreaseRequest = app.removeIncreaseRequest( + decreaseRequest.getNodeId(), + decreaseRequest.getRMContainer().getAllocatedSchedulerKey(), + decreaseRequest.getContainerId()); if (hasIncreaseRequest) { if (LOG.isDebugEnabled()) { LOG.debug("While processing decrease requests, found an increase" - + " request for the same container " - + decreaseRequest.getContainerId() - + ", removed the increase request"); + + " request for the same container " + decreaseRequest + .getContainerId() + ", removed the increase request"); } } // Delta capacity is negative when it's a decrease request @@ -2020,19 +2208,20 @@ public class LeafQueue extends AbstractCSQueue { + " container:" + decreaseRequest.getContainerId() + " ignore this decrease request."); } - } else { + } else{ // Release the delta resource releaseResource(clusterResource, app, absDelta, decreaseRequest.getNodePartition(), - decreaseRequest.getRMContainer(), - true); + decreaseRequest.getRMContainer(), true); // Notify application app.decreaseContainer(decreaseRequest); // Notify node - decreaseRequest.getSchedulerNode() - .decreaseContainer(decreaseRequest.getContainerId(), absDelta); + decreaseRequest.getSchedulerNode().decreaseContainer( + decreaseRequest.getContainerId(), absDelta); resourceDecreased = true; } + } finally { + writeLock.unlock(); } if (resourceDecreased) { @@ -2045,7 +2234,7 @@ public class LeafQueue extends AbstractCSQueue { } } - public synchronized OrderingPolicy + public OrderingPolicy getPendingAppsOrderingPolicy() { return pendingOrderingPolicy; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 3e9785fc927..ffb68928ec9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -107,68 +107,77 @@ public class ParentQueue extends AbstractCSQueue { ", fullname=" + getQueuePath()); } - synchronized void setupQueueConfigs(Resource clusterResource) + void setupQueueConfigs(Resource clusterResource) throws IOException { - super.setupQueueConfigs(clusterResource); - StringBuilder aclsString = new StringBuilder(); - for (Map.Entry e : acls.entrySet()) { - aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); - } - - StringBuilder labelStrBuilder = new StringBuilder(); - if (accessibleLabels != null) { - for (String s : accessibleLabels) { - labelStrBuilder.append(s); - labelStrBuilder.append(","); + try { + writeLock.lock(); + super.setupQueueConfigs(clusterResource); + StringBuilder aclsString = new StringBuilder(); + for (Map.Entry e : acls.entrySet()) { + aclsString.append(e.getKey() + ":" + e.getValue().getAclString()); } - } - LOG.info(queueName + - ", capacity=" + this.queueCapacities.getCapacity() + - ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + - ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + - ", absoluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() + - ", state=" + state + - ", acls=" + aclsString + - ", labels=" + labelStrBuilder.toString() + "\n" + - ", reservationsContinueLooking=" + reservationsContinueLooking); + StringBuilder labelStrBuilder = new StringBuilder(); + if (accessibleLabels != null) { + for (String s : accessibleLabels) { + labelStrBuilder.append(s); + labelStrBuilder.append(","); + } + } + + LOG.info(queueName + ", capacity=" + this.queueCapacities.getCapacity() + + ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() + + ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() + + ", absoluteMaxCapacity=" + this.queueCapacities + .getAbsoluteMaximumCapacity() + ", state=" + state + ", acls=" + + aclsString + ", labels=" + labelStrBuilder.toString() + "\n" + + ", reservationsContinueLooking=" + reservationsContinueLooking); + } finally { + writeLock.unlock(); + } } private static float PRECISION = 0.0005f; // 0.05% precision - synchronized void setChildQueues(Collection childQueues) { - // Validate - float childCapacities = 0; - for (CSQueue queue : childQueues) { - childCapacities += queue.getCapacity(); - } - float delta = Math.abs(1.0f - childCapacities); // crude way to check - // allow capacities being set to 0, and enforce child 0 if parent is 0 - if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || - ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { - throw new IllegalArgumentException("Illegal" + - " capacity of " + childCapacities + - " for children of queue " + queueName); - } - // check label capacities - for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { - float capacityByLabel = queueCapacities.getCapacity(nodeLabel); - // check children's labels - float sum = 0; + + void setChildQueues(Collection childQueues) { + try { + writeLock.lock(); + // Validate + float childCapacities = 0; for (CSQueue queue : childQueues) { - sum += queue.getQueueCapacities().getCapacity(nodeLabel); + childCapacities += queue.getCapacity(); } - if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) - || (capacityByLabel == 0) && (sum > 0)) { - throw new IllegalArgumentException("Illegal" + " capacity of " - + sum + " for children of queue " + queueName - + " for label=" + nodeLabel); + float delta = Math.abs(1.0f - childCapacities); // crude way to check + // allow capacities being set to 0, and enforce child 0 if parent is 0 + if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || ( + (queueCapacities.getCapacity() == 0) && (childCapacities > 0))) { + throw new IllegalArgumentException( + "Illegal" + " capacity of " + childCapacities + + " for children of queue " + queueName); } - } - - this.childQueues.clear(); - this.childQueues.addAll(childQueues); - if (LOG.isDebugEnabled()) { - LOG.debug("setChildQueues: " + getChildQueuesToPrint()); + // check label capacities + for (String nodeLabel : queueCapacities.getExistingNodeLabels()) { + float capacityByLabel = queueCapacities.getCapacity(nodeLabel); + // check children's labels + float sum = 0; + for (CSQueue queue : childQueues) { + sum += queue.getQueueCapacities().getCapacity(nodeLabel); + } + if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION) + || (capacityByLabel == 0) && (sum > 0)) { + throw new IllegalArgumentException( + "Illegal" + " capacity of " + sum + " for children of queue " + + queueName + " for label=" + nodeLabel); + } + } + + this.childQueues.clear(); + this.childQueues.addAll(childQueues); + if (LOG.isDebugEnabled()) { + LOG.debug("setChildQueues: " + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); } } @@ -179,53 +188,70 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized QueueInfo getQueueInfo( + public QueueInfo getQueueInfo( boolean includeChildQueues, boolean recursive) { - QueueInfo queueInfo = getQueueInfo(); + try { + readLock.lock(); + QueueInfo queueInfo = getQueueInfo(); - List childQueuesInfo = new ArrayList(); - if (includeChildQueues) { - for (CSQueue child : childQueues) { - // Get queue information recursively? - childQueuesInfo.add( - child.getQueueInfo(recursive, recursive)); + List childQueuesInfo = new ArrayList<>(); + if (includeChildQueues) { + for (CSQueue child : childQueues) { + // Get queue information recursively? + childQueuesInfo.add(child.getQueueInfo(recursive, recursive)); + } } + queueInfo.setChildQueues(childQueuesInfo); + + return queueInfo; + } finally { + readLock.unlock(); } - queueInfo.setChildQueues(childQueuesInfo); - - return queueInfo; + } - private synchronized QueueUserACLInfo getUserAclInfo( + private QueueUserACLInfo getUserAclInfo( UserGroupInformation user) { - QueueUserACLInfo userAclInfo = - recordFactory.newRecordInstance(QueueUserACLInfo.class); - List operations = new ArrayList(); - for (QueueACL operation : QueueACL.values()) { - if (hasAccess(operation, user)) { - operations.add(operation); - } + try { + readLock.lock(); + QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance( + QueueUserACLInfo.class); + List operations = new ArrayList(); + for (QueueACL operation : QueueACL.values()) { + if (hasAccess(operation, user)) { + operations.add(operation); + } + } + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return userAclInfo; + } finally { + readLock.unlock(); } - userAclInfo.setQueueName(getQueueName()); - userAclInfo.setUserAcls(operations); - return userAclInfo; } @Override - public synchronized List getQueueUserAclInfo( + public List getQueueUserAclInfo( UserGroupInformation user) { - List userAcls = new ArrayList(); - - // Add parent queue acls - userAcls.add(getUserAclInfo(user)); - - // Add children queue acls - for (CSQueue child : childQueues) { - userAcls.addAll(child.getQueueUserAclInfo(user)); + try { + readLock.lock(); + List userAcls = new ArrayList<>(); + + // Add parent queue acls + userAcls.add(getUserAclInfo(user)); + + // Add children queue acls + for (CSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + + return userAcls; + } finally { + readLock.unlock(); } - - return userAcls; + } public String toString() { @@ -240,52 +266,59 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof ParentQueue) || - !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + newlyParsedQueue.getQueuePath()); - } - - ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; - - // Set new configs - setupQueueConfigs(clusterResource); - - // Re-configure existing child queues and add new ones - // The CS has already checked to ensure all existing child queues are present! - Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = - getQueues(newlyParsedParentQueue.childQueues); - for (Map.Entry e : newChildQueues.entrySet()) { - String newChildQueueName = e.getKey(); - CSQueue newChildQueue = e.getValue(); - - CSQueue childQueue = currentChildQueues.get(newChildQueueName); - - // Check if the child-queue already exists - if (childQueue != null) { - // Re-init existing child queues - childQueue.reinitialize(newChildQueue, clusterResource); - LOG.info(getQueueName() + ": re-configured queue: " + childQueue); - } else { - // New child queue, do not re-init - - // Set parent to 'this' - newChildQueue.setParent(this); - - // Save in list of current child queues - currentChildQueues.put(newChildQueueName, newChildQueue); - - LOG.info(getQueueName() + ": added new child queue: " + newChildQueue); + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof ParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); } - } - // Re-sort all queues - childQueues.clear(); - childQueues.addAll(currentChildQueues.values()); + ParentQueue newlyParsedParentQueue = (ParentQueue) newlyParsedQueue; + + // Set new configs + setupQueueConfigs(clusterResource); + + // Re-configure existing child queues and add new ones + // The CS has already checked to ensure all existing child queues are present! + Map currentChildQueues = getQueues(childQueues); + Map newChildQueues = getQueues( + newlyParsedParentQueue.childQueues); + for (Map.Entry e : newChildQueues.entrySet()) { + String newChildQueueName = e.getKey(); + CSQueue newChildQueue = e.getValue(); + + CSQueue childQueue = currentChildQueues.get(newChildQueueName); + + // Check if the child-queue already exists + if (childQueue != null) { + // Re-init existing child queues + childQueue.reinitialize(newChildQueue, clusterResource); + LOG.info(getQueueName() + ": re-configured queue: " + childQueue); + } else{ + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues + currentChildQueues.put(newChildQueueName, newChildQueue); + + LOG.info( + getQueueName() + ": added new child queue: " + newChildQueue); + } + } + + // Re-sort all queues + childQueues.clear(); + childQueues.addAll(currentChildQueues.values()); + } finally { + writeLock.unlock(); + } } Map getQueues(Set queues) { @@ -299,21 +332,24 @@ public class ParentQueue extends AbstractCSQueue { @Override public void submitApplication(ApplicationId applicationId, String user, String queue) throws AccessControlException { - - synchronized (this) { + + try { + writeLock.lock(); // Sanity check if (queue.equals(queueName)) { - throw new AccessControlException("Cannot submit application " + - "to non-leaf queue: " + queueName); + throw new AccessControlException( + "Cannot submit application " + "to non-leaf queue: " + queueName); } - + if (state != QueueState.RUNNING) { - throw new AccessControlException("Queue " + getQueuePath() + - " is STOPPED. Cannot accept submission of application: " + - applicationId); + throw new AccessControlException("Queue " + getQueuePath() + + " is STOPPED. Cannot accept submission of application: " + + applicationId); } addApplication(applicationId, user); + } finally { + writeLock.unlock(); } // Inform the parent queue @@ -342,24 +378,26 @@ public class ParentQueue extends AbstractCSQueue { // finish attempt logic. } - private synchronized void addApplication(ApplicationId applicationId, + private void addApplication(ApplicationId applicationId, String user) { - ++numApplications; + try { + writeLock.lock(); + ++numApplications; - LOG.info("Application added -" + - " appId: " + applicationId + - " user: " + user + - " leaf-queue of parent: " + getQueueName() + - " #applications: " + getNumApplications()); + LOG.info( + "Application added -" + " appId: " + applicationId + " user: " + user + + " leaf-queue of parent: " + getQueueName() + " #applications: " + + getNumApplications()); + } finally { + writeLock.unlock(); + } } @Override public void finishApplication(ApplicationId application, String user) { - - synchronized (this) { - removeApplication(application, user); - } + + removeApplication(application, user); // Inform the parent queue if (parent != null) { @@ -367,16 +405,18 @@ public class ParentQueue extends AbstractCSQueue { } } - private synchronized void removeApplication(ApplicationId applicationId, + private void removeApplication(ApplicationId applicationId, String user) { - - --numApplications; + try { + writeLock.lock(); + --numApplications; - LOG.info("Application removed -" + - " appId: " + applicationId + - " user: " + user + - " leaf-queue of parent: " + getQueueName() + - " #applications: " + getNumApplications()); + LOG.info("Application removed -" + " appId: " + applicationId + " user: " + + user + " leaf-queue of parent: " + getQueueName() + + " #applications: " + getNumApplications()); + } finally { + writeLock.unlock(); + } } private String getParentName() { @@ -384,183 +424,181 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized CSAssignment assignContainers(Resource clusterResource, + public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, SchedulingMode schedulingMode) { - // if our queue cannot access this node, just return - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY - && !accessibleToPartition(node.getPartition())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it is not able to access partition=" + node - .getPartition()); - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.REJECTED, - ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + try { + writeLock.lock(); + // if our queue cannot access this node, just return + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY + && !accessibleToPartition(node.getPartition())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it is not able to access partition=" + node .getPartition()); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - return CSAssignment.NULL_ASSIGNMENT; - } - - // Check if this queue need more resource, simply skip allocation if this - // queue doesn't need more resources. - if (!super.hasPendingResourceRequest(node.getPartition(), - clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip this queue=" + getQueuePath() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-partition=" + node.getPartition()); - } - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } - - return CSAssignment.NULL_ASSIGNMENT; - } - - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - - while (canAssign(clusterResource, node)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to assign containers to child-queue of " - + getQueueName()); - } - - // Are we over maximum-capacity for this queue? - // This will also consider parent's limits and also continuous reservation - // looking - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - resourceLimits, Resources.createResource( - getMetrics().getReservedMB(), getMetrics() - .getReservedVirtualCores()), schedulingMode)) { + } ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); if (rootQueue) { ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, node); } - break; + return CSAssignment.NULL_ASSIGNMENT; } - // Schedule - CSAssignment assignedToChild = - assignContainersToChildQueues(clusterResource, node, resourceLimits, - schedulingMode); - assignment.setType(assignedToChild.getType()); - - // Done if no child-queue assigned anything - if (Resources.greaterThan( - resourceCalculator, clusterResource, - assignedToChild.getResource(), Resources.none())) { - - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.ACCEPTED, - ActivityDiagnosticConstant.EMPTY); - - if (node.getReservedContainer() == null) { - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.ALLOCATED); - } - } else { - if (rootQueue) { - ActivitiesLogger.NODE.finishAllocatedNodeAllocation( - activitiesManager, node, - assignedToChild.getAssignmentInformation() - .getFirstAllocatedOrReservedContainerId(), - AllocationState.RESERVED); - } + // Check if this queue need more resource, simply skip allocation if this + // queue doesn't need more resources. + if (!super.hasPendingResourceRequest(node.getPartition(), clusterResource, + schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip this queue=" + getQueuePath() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-partition=" + node + .getPartition()); } - // Track resource utilization for the parent-queue - allocateResource(clusterResource, assignedToChild.getResource(), - node.getPartition(), assignedToChild.isIncreasedAllocation()); - - // Track resource utilization in this pass of the scheduler - Resources - .addTo(assignment.getResource(), assignedToChild.getResource()); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - assignedToChild.getAssignmentInformation().getAllocated()); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - assignedToChild.getAssignmentInformation().getReserved()); - assignment.getAssignmentInformation().incrAllocations( - assignedToChild.getAssignmentInformation().getNumAllocations()); - assignment.getAssignmentInformation().incrReservations( - assignedToChild.getAssignmentInformation().getNumReservations()); - assignment - .getAssignmentInformation() - .getAllocationDetails() - .addAll( - assignedToChild.getAssignmentInformation().getAllocationDetails()); - assignment - .getAssignmentInformation() - .getReservationDetails() - .addAll( + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + + return CSAssignment.NULL_ASSIGNMENT; + } + + CSAssignment assignment = new CSAssignment(Resources.createResource(0, 0), + NodeType.NODE_LOCAL); + + while (canAssign(clusterResource, node)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to assign containers to child-queue of " + + getQueueName()); + } + + // Are we over maximum-capacity for this queue? + // This will also consider parent's limits and also continuous reservation + // looking + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + resourceLimits, Resources + .createResource(getMetrics().getReservedMB(), + getMetrics().getReservedVirtualCores()), schedulingMode)) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + + break; + } + + // Schedule + CSAssignment assignedToChild = assignContainersToChildQueues( + clusterResource, node, resourceLimits, schedulingMode); + assignment.setType(assignedToChild.getType()); + + // Done if no child-queue assigned anything + if (Resources.greaterThan(resourceCalculator, clusterResource, + assignedToChild.getResource(), Resources.none())) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + if (node.getReservedContainer() == null) { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + } + } else{ + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); + } + } + + // Track resource utilization for the parent-queue + allocateResource(clusterResource, assignedToChild.getResource(), + node.getPartition(), assignedToChild.isIncreasedAllocation()); + + // Track resource utilization in this pass of the scheduler + Resources.addTo(assignment.getResource(), + assignedToChild.getResource()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + assignedToChild.getAssignmentInformation().getAllocated()); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + assignedToChild.getAssignmentInformation().getReserved()); + assignment.getAssignmentInformation().incrAllocations( + assignedToChild.getAssignmentInformation().getNumAllocations()); + assignment.getAssignmentInformation().incrReservations( + assignedToChild.getAssignmentInformation().getNumReservations()); + assignment.getAssignmentInformation().getAllocationDetails().addAll( + assignedToChild.getAssignmentInformation() + .getAllocationDetails()); + assignment.getAssignmentInformation().getReservationDetails().addAll( assignedToChild.getAssignmentInformation() .getReservationDetails()); - assignment.setIncreasedAllocation(assignedToChild - .isIncreasedAllocation()); - - LOG.info("assignedContainer" + - " queue=" + getQueueName() + - " usedCapacity=" + getUsedCapacity() + - " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + - " used=" + queueUsage.getUsed() + - " cluster=" + clusterResource); + assignment.setIncreasedAllocation( + assignedToChild.isIncreasedAllocation()); - } else { - assignment.setSkippedType(assignedToChild.getSkippedType()); + LOG.info("assignedContainer" + " queue=" + getQueueName() + + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + + " cluster=" + clusterResource); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParentName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - if (rootQueue) { - ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, - node); - } + } else{ + assignment.setSkippedType(assignedToChild.getSkippedType()); - break; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("ParentQ=" + getQueueName() - + " assignedSoFarInThisIteration=" + assignment.getResource() - + " usedCapacity=" + getUsedCapacity() - + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity()); - } - - // Do not assign more than one container if this isn't the root queue - // or if we've already assigned an off-switch container - if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { - if (LOG.isDebugEnabled()) { - if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { - LOG.debug("Not assigning more than one off-switch container," + - " assignments so far: " + assignment); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); } + + break; + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "ParentQ=" + getQueueName() + " assignedSoFarInThisIteration=" + + assignment.getResource() + " usedCapacity=" + + getUsedCapacity() + " absoluteUsedCapacity=" + + getAbsoluteUsedCapacity()); + } + + // Do not assign more than one container if this isn't the root queue + // or if we've already assigned an off-switch container + if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) { + if (LOG.isDebugEnabled()) { + if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) { + LOG.debug("Not assigning more than one off-switch container," + + " assignments so far: " + assignment); + } + } + break; } - break; } - } - - return assignment; + + return assignment; + } finally { + writeLock.unlock(); + } } private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) { @@ -628,7 +666,7 @@ public class ParentQueue extends AbstractCSQueue { return childrenList.iterator(); } - private synchronized CSAssignment assignContainersToChildQueues( + private CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; @@ -717,39 +755,45 @@ public class ParentQueue extends AbstractCSQueue { } } - private synchronized void internalReleaseResource(Resource clusterResource, + private void internalReleaseResource(Resource clusterResource, FiCaSchedulerNode node, Resource releasedResource, boolean changeResource, CSQueue completedChildQueue, boolean sortQueues) { - super.releaseResource(clusterResource, - releasedResource, node.getPartition(), - changeResource); + try { + writeLock.lock(); + super.releaseResource(clusterResource, releasedResource, + node.getPartition(), changeResource); - if (LOG.isDebugEnabled()) { - LOG.debug("completedContainer " + this + ", cluster=" + clusterResource); - } + if (LOG.isDebugEnabled()) { + LOG.debug( + "completedContainer " + this + ", cluster=" + clusterResource); + } - // Note that this is using an iterator on the childQueues so this can't - // be called if already within an iterator for the childQueues. Like - // from assignContainersToChildQueues. - if (sortQueues) { - // reinsert the updated queue - for (Iterator iter = childQueues.iterator(); iter.hasNext();) { - CSQueue csqueue = iter.next(); - if (csqueue.equals(completedChildQueue)) { - iter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Re-sorting completed queue: " + csqueue); + // Note that this is using an iterator on the childQueues so this can't + // be called if already within an iterator for the childQueues. Like + // from assignContainersToChildQueues. + if (sortQueues) { + // reinsert the updated queue + for (Iterator iter = childQueues.iterator(); + iter.hasNext(); ) { + CSQueue csqueue = iter.next(); + if (csqueue.equals(completedChildQueue)) { + iter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Re-sorting completed queue: " + csqueue); + } + childQueues.add(csqueue); + break; } - childQueues.add(csqueue); - break; } } - } - // If we skipped sort queue this time, we need to resort queues to make - // sure we allocate from least usage (or order defined by queue policy) - // queues. - needToResortQueuesAtNextAllocation = !sortQueues; + // If we skipped sort queue this time, we need to resort queues to make + // sure we allocate from least usage (or order defined by queue policy) + // queues. + needToResortQueuesAtNextAllocation = !sortQueues; + } finally { + writeLock.unlock(); + } } @Override @@ -806,24 +850,35 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void updateClusterResource(Resource clusterResource, + public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits) { - // Update all children - for (CSQueue childQueue : childQueues) { - // Get ResourceLimits of child queue before assign containers - ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits.getLimit(), - RMNodeLabelsManager.NO_LABEL); - childQueue.updateClusterResource(clusterResource, childLimits); + try { + writeLock.lock(); + // Update all children + for (CSQueue childQueue : childQueues) { + // Get ResourceLimits of child queue before assign containers + ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, + clusterResource, resourceLimits.getLimit(), + RMNodeLabelsManager.NO_LABEL); + childQueue.updateClusterResource(clusterResource, childLimits); + } + + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); + } finally { + writeLock.unlock(); } - - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); } @Override - public synchronized List getChildQueues() { - return new ArrayList(childQueues); + public List getChildQueues() { + try { + readLock.lock(); + return new ArrayList(childQueues); + } finally { + readLock.unlock(); + } + } @Override @@ -832,13 +887,18 @@ public class ParentQueue extends AbstractCSQueue { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - // Careful! Locking order is important! - synchronized (this) { - FiCaSchedulerNode node = - scheduler.getNode(rmContainer.getContainer().getNodeId()); + + // Careful! Locking order is important! + try { + writeLock.lock(); + FiCaSchedulerNode node = scheduler.getNode( + rmContainer.getContainer().getNodeId()); allocateResource(clusterResource, rmContainer.getContainer().getResource(), node.getPartition(), false); + } finally { + writeLock.unlock(); } + if (parent != null) { parent.recoverContainer(clusterResource, attempt, rmContainer); } @@ -851,11 +911,17 @@ public class ParentQueue extends AbstractCSQueue { } @Override - public synchronized void collectSchedulerApplications( + public void collectSchedulerApplications( Collection apps) { - for (CSQueue queue : childQueues) { - queue.collectSchedulerApplications(apps); + try { + readLock.lock(); + for (CSQueue queue : childQueues) { + queue.collectSchedulerApplications(apps); + } + } finally { + readLock.unlock(); } + } @Override @@ -897,44 +963,49 @@ public class ParentQueue extends AbstractCSQueue { } } - public synchronized int getNumApplications() { + public int getNumApplications() { return numApplications; } - synchronized void allocateResource(Resource clusterResource, + void allocateResource(Resource clusterResource, Resource resource, String nodePartition, boolean changeContainerResource) { - super.allocateResource(clusterResource, resource, nodePartition, - changeContainerResource); + try { + writeLock.lock(); + super.allocateResource(clusterResource, resource, nodePartition, + changeContainerResource); - /** - * check if we need to kill (killable) containers if maximum resource violated. - * Doing this because we will deduct killable resource when going from root. - * For example: - *
-     *      Root
-     *      /   \
-     *     a     b
-     *   /  \
-     *  a1  a2
-     * 
- * - * a: max=10G, used=10G, killable=2G - * a1: used=8G, killable=2G - * a2: used=2G, pending=2G, killable=0G - * - * When we get queue-a to allocate resource, even if queue-a - * reaches its max resource, we deduct its used by killable, so we can allocate - * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. - * - * If scheduler finds a 2G available resource in existing cluster, and assigns it - * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G - * - * When this happens, we have to preempt killable container (on same or different - * nodes) of parent queue to avoid violating parent's max resource. - */ - if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) - < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { - killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + /** + * check if we need to kill (killable) containers if maximum resource violated. + * Doing this because we will deduct killable resource when going from root. + * For example: + *
+       *      Root
+       *      /   \
+       *     a     b
+       *   /  \
+       *  a1  a2
+       * 
+ * + * a: max=10G, used=10G, killable=2G + * a1: used=8G, killable=2G + * a2: used=2G, pending=2G, killable=0G + * + * When we get queue-a to allocate resource, even if queue-a + * reaches its max resource, we deduct its used by killable, so we can allocate + * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G. + * + * If scheduler finds a 2G available resource in existing cluster, and assigns it + * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G + * + * When this happens, we have to preempt killable container (on same or different + * nodes) of parent queue to avoid violating parent's max resource. + */ + if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition) + < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) { + killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); + } + } finally { + writeLock.unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 7b53ad5e497..a391f25fbae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -79,76 +79,98 @@ public class PlanQueue extends ParentQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof PlanQueue) - || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() - + " from " + newlyParsedQueue.getQueuePath()); - } + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } - PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; + PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue; - if (newlyParsedParentQueue.getChildQueues().size() > 0) { - throw new IOException( - "Reservable Queue should not have sub-queues in the" - + "configuration"); - } + if (newlyParsedParentQueue.getChildQueues().size() > 0) { + throw new IOException( + "Reservable Queue should not have sub-queues in the" + + "configuration"); + } - // Set new configs - setupQueueConfigs(clusterResource); + // Set new configs + setupQueueConfigs(clusterResource); - updateQuotas(newlyParsedParentQueue.userLimit, - newlyParsedParentQueue.userLimitFactor, - newlyParsedParentQueue.maxAppsForReservation, - newlyParsedParentQueue.maxAppsPerUserForReservation); + updateQuotas(newlyParsedParentQueue.userLimit, + newlyParsedParentQueue.userLimitFactor, + newlyParsedParentQueue.maxAppsForReservation, + newlyParsedParentQueue.maxAppsPerUserForReservation); - // run reinitialize on each existing queue, to trigger absolute cap - // recomputations - for (CSQueue res : this.getChildQueues()) { - res.reinitialize(res, clusterResource); - } - showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues; - } - - synchronized void addChildQueue(CSQueue newQueue) - throws SchedulerDynamicEditException { - if (newQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException("Queue " + newQueue - + " being added has non zero capacity."); - } - boolean added = this.childQueues.add(newQueue); - if (LOG.isDebugEnabled()) { - LOG.debug("updateChildQueues (action: add queue): " + added + " " - + getChildQueuesToPrint()); + // run reinitialize on each existing queue, to trigger absolute cap + // recomputations + for (CSQueue res : this.getChildQueues()) { + res.reinitialize(res, clusterResource); + } + showReservationsAsQueues = + newlyParsedParentQueue.showReservationsAsQueues; + } finally { + writeLock.unlock(); } } - synchronized void removeChildQueue(CSQueue remQueue) + void addChildQueue(CSQueue newQueue) throws SchedulerDynamicEditException { - if (remQueue.getCapacity() > 0) { - throw new SchedulerDynamicEditException("Queue " + remQueue - + " being removed has non zero capacity."); + try { + writeLock.lock(); + if (newQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + newQueue + " being added has non zero capacity."); + } + boolean added = this.childQueues.add(newQueue); + if (LOG.isDebugEnabled()) { + LOG.debug("updateChildQueues (action: add queue): " + added + " " + + getChildQueuesToPrint()); + } + } finally { + writeLock.unlock(); } - Iterator qiter = childQueues.iterator(); - while (qiter.hasNext()) { - CSQueue cs = qiter.next(); - if (cs.equals(remQueue)) { - qiter.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed child queue: {}", cs.getQueueName()); + } + + void removeChildQueue(CSQueue remQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + if (remQueue.getCapacity() > 0) { + throw new SchedulerDynamicEditException( + "Queue " + remQueue + " being removed has non zero capacity."); + } + Iterator qiter = childQueues.iterator(); + while (qiter.hasNext()) { + CSQueue cs = qiter.next(); + if (cs.equals(remQueue)) { + qiter.remove(); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed child queue: {}", cs.getQueueName()); + } } } + } finally { + writeLock.unlock(); } } - protected synchronized float sumOfChildCapacities() { - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); } - return ret; } private void updateQuotas(int userLimit, float userLimitFactor, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index 976cf8cf740..faeb37e8f89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -51,22 +51,28 @@ public class ReservationQueue extends LeafQueue { } @Override - public synchronized void reinitialize(CSQueue newlyParsedQueue, + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { - // Sanity check - if (!(newlyParsedQueue instanceof ReservationQueue) - || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { - throw new IOException("Trying to reinitialize " + getQueuePath() - + " from " + newlyParsedQueue.getQueuePath()); - } - super.reinitialize(newlyParsedQueue, clusterResource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - minimumAllocation, this, labelManager, null); + try { + writeLock.lock(); + // Sanity check + if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + super.reinitialize(newlyParsedQueue, clusterResource); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); - updateQuotas(parent.getUserLimitForReservation(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForReservations(), - parent.getMaxApplicationsPerUserForReservation()); + updateQuotas(parent.getUserLimitForReservation(), + parent.getUserLimitFactor(), + parent.getMaxApplicationsForReservations(), + parent.getMaxApplicationsPerUserForReservation()); + } finally { + writeLock.unlock(); + } } /** @@ -77,21 +83,26 @@ public class ReservationQueue extends LeafQueue { * maxCapacity, etc..) * @throws SchedulerDynamicEditException */ - public synchronized void setEntitlement(QueueEntitlement entitlement) + public void setEntitlement(QueueEntitlement entitlement) throws SchedulerDynamicEditException { - float capacity = entitlement.getCapacity(); - if (capacity < 0 || capacity > 1.0f) { - throw new SchedulerDynamicEditException( - "Capacity demand is not in the [0,1] range: " + capacity); - } - setCapacity(capacity); - setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); - // note: we currently set maxCapacity to capacity - // this might be revised later - setMaxCapacity(entitlement.getMaxCapacity()); - if (LOG.isDebugEnabled()) { - LOG.debug("successfully changed to " + capacity + " for queue " - + this.getQueueName()); + try { + writeLock.lock(); + float capacity = entitlement.getCapacity(); + if (capacity < 0 || capacity > 1.0f) { + throw new SchedulerDynamicEditException( + "Capacity demand is not in the [0,1] range: " + capacity); + } + setCapacity(capacity); + setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity()); + // note: we currently set maxCapacity to capacity + // this might be revised later + setMaxCapacity(entitlement.getMaxCapacity()); + if (LOG.isDebugEnabled()) { + LOG.debug("successfully changed to " + capacity + " for queue " + this + .getQueueName()); + } + } finally { + writeLock.unlock(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 6fba22a9287..26146301d4d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -828,8 +828,8 @@ public class TestContainerResizing { app.getAppAttemptResourceUsage().getPending().getMemorySize()); // Queue/user/application's usage will be updated checkUsedResource(rm1, "default", 0 * GB, null); - Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default")) - .getUser("user").getUsed().getMemorySize()); + // User will be removed + Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user")); Assert.assertEquals(0 * GB, app.getAppAttemptResourceUsage().getReserved().getMemorySize()); Assert.assertEquals(0 * GB,