From 3fdae0a2b66c5fb6853875b66fcf50bc96d6e2e9 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 4 Apr 2017 14:38:45 -0700 Subject: [PATCH] YARN-6109. Add an ability to convert ChildQueue to ParentQueue. (Xuan Gong via wangda) --- .../CapacitySchedulerQueueManager.java | 13 +++ .../scheduler/capacity/ParentQueue.java | 17 ++- .../capacity/TestCapacityScheduler.java | 103 ++++++++++++++++++ 3 files changed, 132 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 76cb5d6aac6..c92c3436412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -264,6 +264,8 @@ static CSQueue parseQueue( /** * Ensure all existing queues are present. Queues cannot be deleted if its not * in Stopped state, Queue's cannot be moved from one hierarchy to other also. + * Previous child queue could be converted into parent queue if it is in + * STOPPED state. * * @param queues existing queues * @param newQueues new queues @@ -292,6 +294,17 @@ private void validateQueueHierarchy(Map queues, throw new IOException(queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() + " after refresh, which is not allowed."); + } else if (oldQueue instanceof LeafQueue + && newQueue instanceof ParentQueue) { + if (oldQueue.getState() == QueueState.STOPPED) { + LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue."); + } else { + throw new IOException("Can not convert the leaf queue: " + + oldQueue.getQueuePath() + " to parent queue since " + + "it is not yet in stopped state. Current State : " + + oldQueue.getState()); + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index f84b7a42dbf..15794720fc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -315,7 +315,22 @@ public void reinitialize(CSQueue newlyParsedQueue, // Check if the child-queue already exists if (childQueue != null) { - // Re-init existing child queues + // Check if the child-queue has been converted into parent queue. + // The CS has already checked to ensure that this child-queue is in + // STOPPED state. + if (childQueue instanceof LeafQueue + && newChildQueue instanceof ParentQueue) { + // We would convert this LeafQueue to ParentQueue, consider this + // as the combination of DELETE then ADD. + newChildQueue.setParent(this); + currentChildQueues.put(newChildQueueName, newChildQueue); + // inform CapacitySchedulerQueueManager + CapacitySchedulerQueueManager queueManager = this.csContext + .getCapacitySchedulerQueueManager(); + queueManager.addQueue(newChildQueueName, newChildQueue); + continue; + } + // Re-init existing queues childQueue.reinitialize(newChildQueue, clusterResource); LOG.info(getQueueName() + ": re-configured queue: " + childQueue); } else{ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ff0f7cfc377..e2f456ca334 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -472,6 +472,52 @@ private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB1( return conf; } + /** + * @param conf, to be modified + * @return, CS configuration which has converted b1 to parent queue + * root + * / \ + * a b + * / \ / | \ + * a1 a2 b1 b2 b3 + * | + * b11 + */ + private CapacitySchedulerConfiguration + setupQueueConfigurationWithB1AsParentQueue( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] {"b1","b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + // Set childQueue for B1 + conf.setQueues(B1, new String[] {"b11"}); + String B11 = B1 + ".b11"; + conf.setCapacity(B11, 100.0f); + conf.setUserLimitFactor(B11, 100.0f); + + return conf; + } + /** * @param conf, to be modified * @return, CS configuration which has deleted a @@ -4142,4 +4188,61 @@ null, new RMContainerTokenSecretManager(conf), cs.stop(); } + + /** + * Test if we can convert a leaf queue to a parent queue + * @throws Exception + */ + @Test (timeout = 10000) + public void testConvertLeafQueueToParentQueue() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + String targetQueue = "b1"; + CSQueue b1 = cs.getQueue(targetQueue); + Assert.assertEquals(b1.getState(), QueueState.RUNNING); + + // test if we can convert a leaf queue which is in RUNNING state + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithB1AsParentQueue(conf); + try { + cs.reinitialize(conf, mockContext); + fail("Expected to throw exception when refresh queue tries to convert" + + " a child queue to a parent queue."); + } catch (IOException e) { + // ignore + } + + // now set queue state for b1 to STOPPED + conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED"); + cs.reinitialize(conf, mockContext); + Assert.assertEquals(b1.getState(), QueueState.STOPPED); + + // test if we can convert a leaf queue which is in STOPPED state + conf = new CapacitySchedulerConfiguration(); + setupQueueConfigurationWithB1AsParentQueue(conf); + try { + cs.reinitialize(conf, mockContext); + } catch (IOException e) { + fail("Expected to NOT throw exception when refresh queue tries" + + " to convert a leaf queue WITHOUT running apps"); + } + b1 = cs.getQueue(targetQueue); + Assert.assertTrue(b1 instanceof ParentQueue); + Assert.assertEquals(b1.getState(), QueueState.RUNNING); + Assert.assertTrue(!b1.getChildQueues().isEmpty()); + } }