apps) {
- for (CSQueue queue : childQueues) {
- queue.collectSchedulerApplications(apps);
+ try {
+ readLock.lock();
+ for (CSQueue queue : childQueues) {
+ queue.collectSchedulerApplications(apps);
+ }
+ } finally {
+ readLock.unlock();
}
+
}
@Override
@@ -897,44 +963,49 @@ public class ParentQueue extends AbstractCSQueue {
}
}
- public synchronized int getNumApplications() {
+ public int getNumApplications() {
return numApplications;
}
- synchronized void allocateResource(Resource clusterResource,
+ void allocateResource(Resource clusterResource,
Resource resource, String nodePartition, boolean changeContainerResource) {
- super.allocateResource(clusterResource, resource, nodePartition,
- changeContainerResource);
+ try {
+ writeLock.lock();
+ super.allocateResource(clusterResource, resource, nodePartition,
+ changeContainerResource);
- /**
- * check if we need to kill (killable) containers if maximum resource violated.
- * Doing this because we will deduct killable resource when going from root.
- * For example:
- *
- * Root
- * / \
- * a b
- * / \
- * a1 a2
- *
- *
- * a: max=10G, used=10G, killable=2G
- * a1: used=8G, killable=2G
- * a2: used=2G, pending=2G, killable=0G
- *
- * When we get queue-a to allocate resource, even if queue-a
- * reaches its max resource, we deduct its used by killable, so we can allocate
- * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
- *
- * If scheduler finds a 2G available resource in existing cluster, and assigns it
- * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
- *
- * When this happens, we have to preempt killable container (on same or different
- * nodes) of parent queue to avoid violating parent's max resource.
- */
- if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
- < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
- killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
+ /**
+ * check if we need to kill (killable) containers if maximum resource violated.
+ * Doing this because we will deduct killable resource when going from root.
+ * For example:
+ *
+ * Root
+ * / \
+ * a b
+ * / \
+ * a1 a2
+ *
+ *
+ * a: max=10G, used=10G, killable=2G
+ * a1: used=8G, killable=2G
+ * a2: used=2G, pending=2G, killable=0G
+ *
+ * When we get queue-a to allocate resource, even if queue-a
+ * reaches its max resource, we deduct its used by killable, so we can allocate
+ * at most 2G resources. ResourceLimits passed down to a2 has headroom set to 2G.
+ *
+ * If scheduler finds a 2G available resource in existing cluster, and assigns it
+ * to a2, now a2's used= 2G + 2G = 4G, and a's used = 8G + 4G = 12G > 10G
+ *
+ * When this happens, we have to preempt killable container (on same or different
+ * nodes) of parent queue to avoid violating parent's max resource.
+ */
+ if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
+ < getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
+ killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
+ }
+ } finally {
+ writeLock.unlock();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 7b53ad5e497..a391f25fbae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -79,76 +79,98 @@ public class PlanQueue extends ParentQueue {
}
@Override
- public synchronized void reinitialize(CSQueue newlyParsedQueue,
+ public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
- // Sanity check
- if (!(newlyParsedQueue instanceof PlanQueue)
- || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
- throw new IOException("Trying to reinitialize " + getQueuePath()
- + " from " + newlyParsedQueue.getQueuePath());
- }
+ try {
+ writeLock.lock();
+ // Sanity check
+ if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
- PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+ PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
- if (newlyParsedParentQueue.getChildQueues().size() > 0) {
- throw new IOException(
- "Reservable Queue should not have sub-queues in the"
- + "configuration");
- }
+ if (newlyParsedParentQueue.getChildQueues().size() > 0) {
+ throw new IOException(
+ "Reservable Queue should not have sub-queues in the"
+ + "configuration");
+ }
- // Set new configs
- setupQueueConfigs(clusterResource);
+ // Set new configs
+ setupQueueConfigs(clusterResource);
- updateQuotas(newlyParsedParentQueue.userLimit,
- newlyParsedParentQueue.userLimitFactor,
- newlyParsedParentQueue.maxAppsForReservation,
- newlyParsedParentQueue.maxAppsPerUserForReservation);
+ updateQuotas(newlyParsedParentQueue.userLimit,
+ newlyParsedParentQueue.userLimitFactor,
+ newlyParsedParentQueue.maxAppsForReservation,
+ newlyParsedParentQueue.maxAppsPerUserForReservation);
- // run reinitialize on each existing queue, to trigger absolute cap
- // recomputations
- for (CSQueue res : this.getChildQueues()) {
- res.reinitialize(res, clusterResource);
- }
- showReservationsAsQueues = newlyParsedParentQueue.showReservationsAsQueues;
- }
-
- synchronized void addChildQueue(CSQueue newQueue)
- throws SchedulerDynamicEditException {
- if (newQueue.getCapacity() > 0) {
- throw new SchedulerDynamicEditException("Queue " + newQueue
- + " being added has non zero capacity.");
- }
- boolean added = this.childQueues.add(newQueue);
- if (LOG.isDebugEnabled()) {
- LOG.debug("updateChildQueues (action: add queue): " + added + " "
- + getChildQueuesToPrint());
+ // run reinitialize on each existing queue, to trigger absolute cap
+ // recomputations
+ for (CSQueue res : this.getChildQueues()) {
+ res.reinitialize(res, clusterResource);
+ }
+ showReservationsAsQueues =
+ newlyParsedParentQueue.showReservationsAsQueues;
+ } finally {
+ writeLock.unlock();
}
}
- synchronized void removeChildQueue(CSQueue remQueue)
+ void addChildQueue(CSQueue newQueue)
throws SchedulerDynamicEditException {
- if (remQueue.getCapacity() > 0) {
- throw new SchedulerDynamicEditException("Queue " + remQueue
- + " being removed has non zero capacity.");
+ try {
+ writeLock.lock();
+ if (newQueue.getCapacity() > 0) {
+ throw new SchedulerDynamicEditException(
+ "Queue " + newQueue + " being added has non zero capacity.");
+ }
+ boolean added = this.childQueues.add(newQueue);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("updateChildQueues (action: add queue): " + added + " "
+ + getChildQueuesToPrint());
+ }
+ } finally {
+ writeLock.unlock();
}
- Iterator qiter = childQueues.iterator();
- while (qiter.hasNext()) {
- CSQueue cs = qiter.next();
- if (cs.equals(remQueue)) {
- qiter.remove();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removed child queue: {}", cs.getQueueName());
+ }
+
+ void removeChildQueue(CSQueue remQueue)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ if (remQueue.getCapacity() > 0) {
+ throw new SchedulerDynamicEditException(
+ "Queue " + remQueue + " being removed has non zero capacity.");
+ }
+ Iterator qiter = childQueues.iterator();
+ while (qiter.hasNext()) {
+ CSQueue cs = qiter.next();
+ if (cs.equals(remQueue)) {
+ qiter.remove();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed child queue: {}", cs.getQueueName());
+ }
}
}
+ } finally {
+ writeLock.unlock();
}
}
- protected synchronized float sumOfChildCapacities() {
- float ret = 0;
- for (CSQueue l : childQueues) {
- ret += l.getCapacity();
+ protected float sumOfChildCapacities() {
+ try {
+ writeLock.lock();
+ float ret = 0;
+ for (CSQueue l : childQueues) {
+ ret += l.getCapacity();
+ }
+ return ret;
+ } finally {
+ writeLock.unlock();
}
- return ret;
}
private void updateQuotas(int userLimit, float userLimitFactor,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 976cf8cf740..faeb37e8f89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -51,22 +51,28 @@ public class ReservationQueue extends LeafQueue {
}
@Override
- public synchronized void reinitialize(CSQueue newlyParsedQueue,
+ public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
- // Sanity check
- if (!(newlyParsedQueue instanceof ReservationQueue)
- || !newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
- throw new IOException("Trying to reinitialize " + getQueuePath()
- + " from " + newlyParsedQueue.getQueuePath());
- }
- super.reinitialize(newlyParsedQueue, clusterResource);
- CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
- minimumAllocation, this, labelManager, null);
+ try {
+ writeLock.lock();
+ // Sanity check
+ if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
+ .getQueuePath().equals(getQueuePath())) {
+ throw new IOException(
+ "Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
+ }
+ super.reinitialize(newlyParsedQueue, clusterResource);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+ minimumAllocation, this, labelManager, null);
- updateQuotas(parent.getUserLimitForReservation(),
- parent.getUserLimitFactor(),
- parent.getMaxApplicationsForReservations(),
- parent.getMaxApplicationsPerUserForReservation());
+ updateQuotas(parent.getUserLimitForReservation(),
+ parent.getUserLimitFactor(),
+ parent.getMaxApplicationsForReservations(),
+ parent.getMaxApplicationsPerUserForReservation());
+ } finally {
+ writeLock.unlock();
+ }
}
/**
@@ -77,21 +83,26 @@ public class ReservationQueue extends LeafQueue {
* maxCapacity, etc..)
* @throws SchedulerDynamicEditException
*/
- public synchronized void setEntitlement(QueueEntitlement entitlement)
+ public void setEntitlement(QueueEntitlement entitlement)
throws SchedulerDynamicEditException {
- float capacity = entitlement.getCapacity();
- if (capacity < 0 || capacity > 1.0f) {
- throw new SchedulerDynamicEditException(
- "Capacity demand is not in the [0,1] range: " + capacity);
- }
- setCapacity(capacity);
- setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
- // note: we currently set maxCapacity to capacity
- // this might be revised later
- setMaxCapacity(entitlement.getMaxCapacity());
- if (LOG.isDebugEnabled()) {
- LOG.debug("successfully changed to " + capacity + " for queue "
- + this.getQueueName());
+ try {
+ writeLock.lock();
+ float capacity = entitlement.getCapacity();
+ if (capacity < 0 || capacity > 1.0f) {
+ throw new SchedulerDynamicEditException(
+ "Capacity demand is not in the [0,1] range: " + capacity);
+ }
+ setCapacity(capacity);
+ setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
+ // note: we currently set maxCapacity to capacity
+ // this might be revised later
+ setMaxCapacity(entitlement.getMaxCapacity());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("successfully changed to " + capacity + " for queue " + this
+ .getQueueName());
+ }
+ } finally {
+ writeLock.unlock();
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 6fba22a9287..26146301d4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -828,8 +828,8 @@ public class TestContainerResizing {
app.getAppAttemptResourceUsage().getPending().getMemorySize());
// Queue/user/application's usage will be updated
checkUsedResource(rm1, "default", 0 * GB, null);
- Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
- .getUser("user").getUsed().getMemorySize());
+ // User will be removed
+ Assert.assertNull(((LeafQueue) cs.getQueue("default")).getUser("user"));
Assert.assertEquals(0 * GB,
app.getAppAttemptResourceUsage().getReserved().getMemorySize());
Assert.assertEquals(0 * GB,