YARN-9780. SchedulerConf Mutation API does not Allow Stop and Remove Queue in a single call. Contributed by Prabhu Joseph
This commit is contained in:
parent
c71befaf8f
commit
4627dd6708
|
@ -177,7 +177,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
csContext.getRMContext().getHAServiceState()
|
csContext.getRMContext().getHAServiceState()
|
||||||
!= HAServiceProtocol.HAServiceState.STANDBY) {
|
!= HAServiceProtocol.HAServiceState.STANDBY) {
|
||||||
// Ensure queue hierarchy in the new XML file is proper.
|
// Ensure queue hierarchy in the new XML file is proper.
|
||||||
validateQueueHierarchy(queues, newQueues);
|
validateQueueHierarchy(queues, newQueues, newConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add new queues and delete OldQeueus only after validation.
|
// Add new queues and delete OldQeueus only after validation.
|
||||||
|
@ -309,7 +309,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
* @param newQueues new queues
|
* @param newQueues new queues
|
||||||
*/
|
*/
|
||||||
private void validateQueueHierarchy(Map<String, CSQueue> queues,
|
private void validateQueueHierarchy(Map<String, CSQueue> queues,
|
||||||
Map<String, CSQueue> newQueues) throws IOException {
|
Map<String, CSQueue> newQueues, CapacitySchedulerConfiguration newConf)
|
||||||
|
throws IOException {
|
||||||
// check that all static queues are included in the newQueues list
|
// check that all static queues are included in the newQueues list
|
||||||
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
|
||||||
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
|
if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
|
||||||
|
@ -319,7 +320,18 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
||||||
CSQueue newQueue = newQueues.get(queueName);
|
CSQueue newQueue = newQueues.get(queueName);
|
||||||
if (null == newQueue) {
|
if (null == newQueue) {
|
||||||
// old queue doesn't exist in the new XML
|
// old queue doesn't exist in the new XML
|
||||||
if (oldQueue.getState() == QueueState.STOPPED) {
|
String configPrefix = newConf.getQueuePrefix(
|
||||||
|
oldQueue.getQueuePath());
|
||||||
|
QueueState newQueueState = null;
|
||||||
|
try {
|
||||||
|
newQueueState = QueueState.valueOf(
|
||||||
|
newConf.get(configPrefix + "state"));
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.warn("Not a valid queue state for queue "
|
||||||
|
+ oldQueue.getQueuePath());
|
||||||
|
}
|
||||||
|
if (oldQueue.getState() == QueueState.STOPPED ||
|
||||||
|
newQueueState == QueueState.STOPPED) {
|
||||||
LOG.info("Deleting Queue " + queueName + ", as it is not"
|
LOG.info("Deleting Queue " + queueName + ", as it is not"
|
||||||
+ " present in the modified capacity configuration xml");
|
+ " present in the modified capacity configuration xml");
|
||||||
} else{
|
} else{
|
||||||
|
|
|
@ -451,6 +451,38 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
.put(ClientResponse.class);
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
CapacitySchedulerConfiguration newCSConf =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
assertEquals("Failed to remove the queue",
|
||||||
|
1, newCSConf.getQueues("root.a").length);
|
||||||
|
assertEquals("Failed to remove the right queue",
|
||||||
|
"a1", newCSConf.getQueues("root.a")[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStopWithRemoveQueue() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
|
||||||
|
ClientResponse response;
|
||||||
|
|
||||||
|
// Set state of queues to STOPPED.
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> stoppedParam = new HashMap<>();
|
||||||
|
stoppedParam.put(CapacitySchedulerConfiguration.STATE,
|
||||||
|
QueueState.STOPPED.toString());
|
||||||
|
QueueConfigInfo stoppedInfo = new QueueConfigInfo("root.a.a2",
|
||||||
|
stoppedParam);
|
||||||
|
updateInfo.getUpdateQueueInfo().add(stoppedInfo);
|
||||||
|
|
||||||
|
updateInfo.getRemoveQueueInfo().add("root.a.a2");
|
||||||
|
response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(YarnWebServiceUtils.toJson(updateInfo,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
CapacitySchedulerConfiguration newCSConf =
|
CapacitySchedulerConfiguration newCSConf =
|
||||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue