diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 215869a2693..1b9392eafb8 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -157,6 +157,9 @@ Release 0.23.5 - UNRELEASED YARN-174. Modify NodeManager to pass the user's configuration even when rebooting. (vinodkv) + YARN-177. CapacityScheduler - adding a queue while the RM is running has + wacky results (acmurthy vai tgraves) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 8a43b2da27b..d21a888cf91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -50,6 +50,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { */ public CSQueue getParent(); + /** + * Set the parent Queue. + * @param newParentQueue new parent queue + */ + public void setParent(CSQueue newParentQueue); + /** * Get the queue name. * @return the queue name @@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { /** * Reinitialize the queue. - * @param queue new queue to re-initalize from + * @param newlyParsedQueue new queue to re-initalize from * @param clusterResource resources in the cluster */ - public void reinitialize(CSQueue queue, Resource clusterResource) + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException; /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index d222b9061a8..614378aebd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -223,7 +223,7 @@ public class LeafQueue implements CSQueue { { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absCapacity = parent.getAbsoluteCapacity() * capacity; + float absCapacity = getParent().getAbsoluteCapacity() * capacity; CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity); this.capacity = capacity; @@ -256,7 +256,7 @@ public class LeafQueue implements CSQueue { // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); LOG.info("Initializing " + queueName + "\n" + "capacity = " + capacity + @@ -339,10 +339,15 @@ public class LeafQueue implements CSQueue { } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } - + + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + @Override public String getQueueName() { return queueName; @@ -350,7 +355,7 @@ public class LeafQueue implements CSQueue { @Override public String getQueuePath() { - return parent.getQueuePath() + "." + getQueueName(); + return getParent().getQueuePath() + "." + getQueueName(); } /** @@ -430,7 +435,9 @@ public class LeafQueue implements CSQueue { synchronized void setMaxCapacity(float maximumCapacity) { // Sanity check CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); - float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); + float absMaxCapacity = + CSQueueUtils.computeAbsoluteMaximumCapacity( + maximumCapacity, getParent()); CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity); this.maximumCapacity = maximumCapacity; @@ -453,10 +460,6 @@ public class LeafQueue implements CSQueue { this.userLimitFactor = userLimitFactor; } - synchronized void setParentQueue(CSQueue parent) { - this.parent = parent; - } - @Override public synchronized int getNumApplications() { return getNumPendingApplications() + getNumActiveApplications(); @@ -559,26 +562,28 @@ public class LeafQueue implements CSQueue { } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof LeafQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof LeafQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - LeafQueue leafQueue = (LeafQueue)queue; + LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue; setupQueueConfigs( clusterResource, - leafQueue.capacity, leafQueue.absoluteCapacity, - leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity, - leafQueue.userLimit, leafQueue.userLimitFactor, - leafQueue.maxApplications, - leafQueue.getMaxApplicationsPerUser(), - leafQueue.getMaximumActiveApplications(), - leafQueue.getMaximumActiveApplicationsPerUser(), - leafQueue.state, leafQueue.acls); + newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity, + newlyParsedLeafQueue.maximumCapacity, + newlyParsedLeafQueue.absoluteMaxCapacity, + newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor, + newlyParsedLeafQueue.maxApplications, + newlyParsedLeafQueue.getMaxApplicationsPerUser(), + newlyParsedLeafQueue.getMaximumActiveApplications(), + newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), + newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); } @Override @@ -591,7 +596,7 @@ public class LeafQueue implements CSQueue { } // Check if parent-queue allows access - return parent.hasAccess(acl, user); + return getParent().hasAccess(acl, user); } @Override @@ -649,10 +654,10 @@ public class LeafQueue implements CSQueue { // Inform the parent queue try { - parent.submitApplication(application, userName, queue); + getParent().submitApplication(application, userName, queue); } catch (AccessControlException ace) { LOG.info("Failed to submit application to parent-queue: " + - parent.getQueuePath(), ace); + getParent().getQueuePath(), ace); removeApplication(application, user); throw ace; } @@ -708,7 +713,7 @@ public class LeafQueue implements CSQueue { } // Inform the parent queue - parent.finishApplication(application, queue); + getParent().finishApplication(application, queue); } public synchronized void removeApplication(FiCaSchedulerApp application, User user) { @@ -1351,7 +1356,7 @@ public class LeafQueue implements CSQueue { } // Inform the parent queue - parent.completedContainer(clusterResource, application, + getParent().completedContainer(clusterResource, application, node, rmContainer, null, event); } } @@ -1361,7 +1366,7 @@ public class LeafQueue implements CSQueue { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); ++numContainers; // Update user metrics @@ -1386,7 +1391,7 @@ public class LeafQueue implements CSQueue { // Update queue metrics Resources.subtractFrom(usedResources, resource); CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); --numContainers; // Update user metrics @@ -1417,7 +1422,7 @@ public class LeafQueue implements CSQueue { // Update metrics CSQueueUtils.updateQueueStatistics( - this, parent, clusterResource, minimumAllocation); + this, getParent(), clusterResource, minimumAllocation); // Update application properties for (FiCaSchedulerApp application : activeApplications) { @@ -1488,7 +1493,7 @@ public class LeafQueue implements CSQueue { synchronized (this) { allocateResource(clusterResource, application, container.getResource()); } - parent.recoverContainer(clusterResource, application, container); + getParent().recoverContainer(clusterResource, application, container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 25e982bbc03..75fcbde516c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -60,7 +60,7 @@ public class ParentQueue implements CSQueue { private static final Log LOG = LogFactory.getLog(ParentQueue.class); - private final CSQueue parent; + private CSQueue parent; private final String queueName; private float capacity; @@ -216,10 +216,15 @@ public class ParentQueue implements CSQueue { } @Override - public CSQueue getParent() { + public synchronized CSQueue getParent() { return parent; } + @Override + public synchronized void setParent(CSQueue newParentQueue) { + this.parent = (ParentQueue)newParentQueue; + } + @Override public String getQueueName() { return queueName; @@ -357,37 +362,52 @@ public class ParentQueue implements CSQueue { } @Override - public synchronized void reinitialize(CSQueue queue, Resource clusterResource) + public synchronized void reinitialize( + CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { // Sanity check - if (!(queue instanceof ParentQueue) || - !queue.getQueuePath().equals(getQueuePath())) { + if (!(newlyParsedQueue instanceof ParentQueue) || + !newlyParsedQueue.getQueuePath().equals(getQueuePath())) { throw new IOException("Trying to reinitialize " + getQueuePath() + - " from " + queue.getQueuePath()); + " from " + newlyParsedQueue.getQueuePath()); } - ParentQueue parentQueue = (ParentQueue)queue; + ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue; // Set new configs setupQueueConfigs(clusterResource, - parentQueue.capacity, parentQueue.absoluteCapacity, - parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity, - parentQueue.state, parentQueue.acls); + newlyParsedParentQueue.capacity, + newlyParsedParentQueue.absoluteCapacity, + newlyParsedParentQueue.maximumCapacity, + newlyParsedParentQueue.absoluteMaxCapacity, + newlyParsedParentQueue.state, + newlyParsedParentQueue.acls); // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! Map currentChildQueues = getQueues(childQueues); - Map newChildQueues = getQueues(parentQueue.childQueues); + Map newChildQueues = + getQueues(newlyParsedParentQueue.childQueues); for (Map.Entry e : newChildQueues.entrySet()) { String newChildQueueName = e.getKey(); CSQueue newChildQueue = e.getValue(); CSQueue childQueue = currentChildQueues.get(newChildQueueName); - if (childQueue != null){ + + // Check if the child-queue already exists + if (childQueue != null) { + // Re-init existing child queues childQueue.reinitialize(newChildQueue, clusterResource); LOG.info(getQueueName() + ": re-configured queue: " + childQueue); } else { + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues currentChildQueues.put(newChildQueueName, newChildQueue); + LOG.info(getQueueName() + ": added new child queue: " + newChildQueue); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 8a7b89eaf2f..e7af5afcc20 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -378,4 +378,43 @@ public class TestCapacityScheduler { Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory()); } + + @Test + public void testRefreshQueuesWithNewQueue() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new ClientToAMTokenSecretManagerInRM())); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + // Add a new queue b4 + String B4 = B + ".b4"; + float B4_CAPACITY = 10; + + B3_CAPACITY -= B4_CAPACITY; + try { + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setCapacity(B2, B2_CAPACITY); + conf.setCapacity(B3, B3_CAPACITY); + conf.setCapacity(B4, B4_CAPACITY); + cs.reinitialize(conf,null); + checkQueueCapacities(cs, 80f, 20f); + + // Verify parent for B4 + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueB4 = findQueue(queueB, B4); + + assertEquals(queueB, queueB4.getParent()); + } finally { + B3_CAPACITY += B4_CAPACITY; + } + } + }