YARN-6109. Add an ability to convert ChildQueue to ParentQueue. (Xuan Gong via wangda)
This commit is contained in:
parent
18432130a7
commit
3fdae0a2b6
|
@ -264,6 +264,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
/**
|
/**
|
||||||
* Ensure all existing queues are present. Queues cannot be deleted if its not
|
* Ensure all existing queues are present. Queues cannot be deleted if its not
|
||||||
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
|
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
|
||||||
|
* Previous child queue could be converted into parent queue if it is in
|
||||||
|
* STOPPED state.
|
||||||
*
|
*
|
||||||
* @param queues existing queues
|
* @param queues existing queues
|
||||||
* @param newQueues new queues
|
* @param newQueues new queues
|
||||||
|
@ -292,6 +294,17 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
throw new IOException(queueName + " is moved from:"
|
throw new IOException(queueName + " is moved from:"
|
||||||
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
|
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
|
||||||
+ " after refresh, which is not allowed.");
|
+ " after refresh, which is not allowed.");
|
||||||
|
} else if (oldQueue instanceof LeafQueue
|
||||||
|
&& newQueue instanceof ParentQueue) {
|
||||||
|
if (oldQueue.getState() == QueueState.STOPPED) {
|
||||||
|
LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
|
||||||
|
+ " to parent queue.");
|
||||||
|
} else {
|
||||||
|
throw new IOException("Can not convert the leaf queue: "
|
||||||
|
+ oldQueue.getQueuePath() + " to parent queue since "
|
||||||
|
+ "it is not yet in stopped state. Current State : "
|
||||||
|
+ oldQueue.getState());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -315,7 +315,22 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Check if the child-queue already exists
|
// Check if the child-queue already exists
|
||||||
if (childQueue != null) {
|
if (childQueue != null) {
|
||||||
// Re-init existing child queues
|
// Check if the child-queue has been converted into parent queue.
|
||||||
|
// The CS has already checked to ensure that this child-queue is in
|
||||||
|
// STOPPED state.
|
||||||
|
if (childQueue instanceof LeafQueue
|
||||||
|
&& newChildQueue instanceof ParentQueue) {
|
||||||
|
// We would convert this LeafQueue to ParentQueue, consider this
|
||||||
|
// as the combination of DELETE then ADD.
|
||||||
|
newChildQueue.setParent(this);
|
||||||
|
currentChildQueues.put(newChildQueueName, newChildQueue);
|
||||||
|
// inform CapacitySchedulerQueueManager
|
||||||
|
CapacitySchedulerQueueManager queueManager = this.csContext
|
||||||
|
.getCapacitySchedulerQueueManager();
|
||||||
|
queueManager.addQueue(newChildQueueName, newChildQueue);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// Re-init existing queues
|
||||||
childQueue.reinitialize(newChildQueue, clusterResource);
|
childQueue.reinitialize(newChildQueue, clusterResource);
|
||||||
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
|
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
|
||||||
} else{
|
} else{
|
||||||
|
|
|
@ -472,6 +472,52 @@ public class TestCapacityScheduler {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param conf, to be modified
|
||||||
|
* @return, CS configuration which has converted b1 to parent queue
|
||||||
|
* root
|
||||||
|
* / \
|
||||||
|
* a b
|
||||||
|
* / \ / | \
|
||||||
|
* a1 a2 b1 b2 b3
|
||||||
|
* |
|
||||||
|
* b11
|
||||||
|
*/
|
||||||
|
private CapacitySchedulerConfiguration
|
||||||
|
setupQueueConfigurationWithB1AsParentQueue(
|
||||||
|
CapacitySchedulerConfiguration conf) {
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] { "a", "b" });
|
||||||
|
|
||||||
|
conf.setCapacity(A, A_CAPACITY);
|
||||||
|
conf.setCapacity(B, B_CAPACITY);
|
||||||
|
|
||||||
|
// Define 2nd-level queues
|
||||||
|
conf.setQueues(A, new String[] { "a1", "a2" });
|
||||||
|
conf.setCapacity(A1, A1_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(A1, 100.0f);
|
||||||
|
conf.setCapacity(A2, A2_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(A2, 100.0f);
|
||||||
|
|
||||||
|
conf.setQueues(B, new String[] {"b1","b2", "b3"});
|
||||||
|
conf.setCapacity(B1, B1_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B1, 100.0f);
|
||||||
|
conf.setCapacity(B2, B2_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B2, 100.0f);
|
||||||
|
conf.setCapacity(B3, B3_CAPACITY);
|
||||||
|
conf.setUserLimitFactor(B3, 100.0f);
|
||||||
|
|
||||||
|
// Set childQueue for B1
|
||||||
|
conf.setQueues(B1, new String[] {"b11"});
|
||||||
|
String B11 = B1 + ".b11";
|
||||||
|
conf.setCapacity(B11, 100.0f);
|
||||||
|
conf.setUserLimitFactor(B11, 100.0f);
|
||||||
|
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf, to be modified
|
* @param conf, to be modified
|
||||||
* @return, CS configuration which has deleted a
|
* @return, CS configuration which has deleted a
|
||||||
|
@ -4142,4 +4188,61 @@ public class TestCapacityScheduler {
|
||||||
|
|
||||||
cs.stop();
|
cs.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if we can convert a leaf queue to a parent queue
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test (timeout = 10000)
|
||||||
|
public void testConvertLeafQueueToParentQueue() throws Exception {
|
||||||
|
CapacityScheduler cs = new CapacityScheduler();
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
|
||||||
|
null, new RMContainerTokenSecretManager(conf),
|
||||||
|
new NMTokenSecretManagerInRM(conf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
cs.setConf(new YarnConfiguration());
|
||||||
|
cs.setRMContext(resourceManager.getRMContext());
|
||||||
|
cs.init(conf);
|
||||||
|
cs.start();
|
||||||
|
cs.reinitialize(conf, rmContext);
|
||||||
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
|
String targetQueue = "b1";
|
||||||
|
CSQueue b1 = cs.getQueue(targetQueue);
|
||||||
|
Assert.assertEquals(b1.getState(), QueueState.RUNNING);
|
||||||
|
|
||||||
|
// test if we can convert a leaf queue which is in RUNNING state
|
||||||
|
conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfigurationWithB1AsParentQueue(conf);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
fail("Expected to throw exception when refresh queue tries to convert"
|
||||||
|
+ " a child queue to a parent queue.");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
|
||||||
|
// now set queue state for b1 to STOPPED
|
||||||
|
conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(conf);
|
||||||
|
conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
Assert.assertEquals(b1.getState(), QueueState.STOPPED);
|
||||||
|
|
||||||
|
// test if we can convert a leaf queue which is in STOPPED state
|
||||||
|
conf = new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfigurationWithB1AsParentQueue(conf);
|
||||||
|
try {
|
||||||
|
cs.reinitialize(conf, mockContext);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("Expected to NOT throw exception when refresh queue tries"
|
||||||
|
+ " to convert a leaf queue WITHOUT running apps");
|
||||||
|
}
|
||||||
|
b1 = cs.getQueue(targetQueue);
|
||||||
|
Assert.assertTrue(b1 instanceof ParentQueue);
|
||||||
|
Assert.assertEquals(b1.getState(), QueueState.RUNNING);
|
||||||
|
Assert.assertTrue(!b1.getChildQueues().isEmpty());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue