YARN-9950. Unset Ordering Policy of Leaf/Parent queue converted from Parent/Leaf queue respectively. Contributed by Prabhu Joseph.
This commit is contained in:
parent
eb73ba6ed5
commit
51e7d1b37e
|
@ -42,6 +42,8 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CS configuration provider which implements
|
* CS configuration provider which implements
|
||||||
* {@link MutableConfigurationProvider} for modifying capacity scheduler
|
* {@link MutableConfigurationProvider} for modifying capacity scheduler
|
||||||
|
@ -259,6 +261,13 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
+ CapacitySchedulerConfiguration.QUEUES;
|
+ CapacitySchedulerConfiguration.QUEUES;
|
||||||
if (siblingQueues.size() == 0) {
|
if (siblingQueues.size() == 0) {
|
||||||
confUpdate.put(queuesConfig, null);
|
confUpdate.put(queuesConfig, null);
|
||||||
|
// Unset Ordering Policy of Leaf Queue converted from
|
||||||
|
// Parent Queue after removeQueue
|
||||||
|
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
|
||||||
|
+ ORDERING_POLICY;
|
||||||
|
proposedConf.unset(queueOrderingPolicy);
|
||||||
|
confUpdate.put(queueOrderingPolicy, null);
|
||||||
} else {
|
} else {
|
||||||
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
|
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
|
||||||
}
|
}
|
||||||
|
@ -307,6 +316,14 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider,
|
||||||
}
|
}
|
||||||
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
||||||
}
|
}
|
||||||
|
// Unset Ordering Policy of Parent Queue converted from
|
||||||
|
// Leaf Queue after addQueue
|
||||||
|
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
if (siblingQueues.size() == 1) {
|
||||||
|
proposedConf.unset(queueOrderingPolicy);
|
||||||
|
confUpdate.put(queueOrderingPolicy, null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ import java.util.Map;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test scheduler configuration mutation via REST API.
|
* Test scheduler configuration mutation via REST API.
|
||||||
|
@ -333,6 +334,105 @@ public class TestRMWebServicesConfigurationMutation extends JerseyTestBase {
|
||||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnsetParentQueueOrderingPolicy() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response;
|
||||||
|
|
||||||
|
// Update ordering policy of Leaf Queue root.b to fair
|
||||||
|
SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateParam = new HashMap<>();
|
||||||
|
updateParam.put(CapacitySchedulerConfiguration.ORDERING_POLICY,
|
||||||
|
"fair");
|
||||||
|
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.b", updateParam);
|
||||||
|
updateInfo1.getUpdateQueueInfo().add(aUpdateInfo);
|
||||||
|
response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(YarnWebServiceUtils.toJson(updateInfo1,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
CapacitySchedulerConfiguration newCSConf =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
String bOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ "root.b" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
assertEquals("fair", newCSConf.get(bOrderingPolicy));
|
||||||
|
|
||||||
|
stopQueue("root.b");
|
||||||
|
|
||||||
|
// Add root.b.b1 which makes root.b a Parent Queue
|
||||||
|
SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> capacity = new HashMap<>();
|
||||||
|
capacity.put(CapacitySchedulerConfiguration.CAPACITY, "100");
|
||||||
|
QueueConfigInfo b1 = new QueueConfigInfo("root.b.b1", capacity);
|
||||||
|
updateInfo2.getAddQueueInfo().add(b1);
|
||||||
|
response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(YarnWebServiceUtils.toJson(updateInfo2,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
|
||||||
|
// Validate unset ordering policy of root.b after converted to
|
||||||
|
// Parent Queue
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
newCSConf = ((CapacityScheduler) rm.getResourceScheduler())
|
||||||
|
.getConfiguration();
|
||||||
|
bOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ "root.b" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
assertNull("Failed to unset Parent Queue OrderingPolicy",
|
||||||
|
newCSConf.get(bOrderingPolicy));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnsetLeafQueueOrderingPolicy() throws Exception {
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response;
|
||||||
|
|
||||||
|
// Update ordering policy of Parent Queue root.c to priority-utilization
|
||||||
|
SchedConfUpdateInfo updateInfo1 = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateParam = new HashMap<>();
|
||||||
|
updateParam.put(CapacitySchedulerConfiguration.ORDERING_POLICY,
|
||||||
|
"priority-utilization");
|
||||||
|
QueueConfigInfo aUpdateInfo = new QueueConfigInfo("root.c", updateParam);
|
||||||
|
updateInfo1.getUpdateQueueInfo().add(aUpdateInfo);
|
||||||
|
response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(YarnWebServiceUtils.toJson(updateInfo1,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
CapacitySchedulerConfiguration newCSConf =
|
||||||
|
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||||
|
String cOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ "root.c" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
assertEquals("priority-utilization", newCSConf.get(cOrderingPolicy));
|
||||||
|
|
||||||
|
stopQueue("root.c.c1");
|
||||||
|
|
||||||
|
// Remove root.c.c1 which makes root.c a Leaf Queue
|
||||||
|
SchedConfUpdateInfo updateInfo2 = new SchedConfUpdateInfo();
|
||||||
|
updateInfo2.getRemoveQueueInfo().add("root.c.c1");
|
||||||
|
response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("scheduler-conf").queryParam("user.name", userName)
|
||||||
|
.accept(MediaType.APPLICATION_JSON)
|
||||||
|
.entity(YarnWebServiceUtils.toJson(updateInfo2,
|
||||||
|
SchedConfUpdateInfo.class), MediaType.APPLICATION_JSON)
|
||||||
|
.put(ClientResponse.class);
|
||||||
|
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||||
|
|
||||||
|
// Validate unset ordering policy of root.c after converted to
|
||||||
|
// Leaf Queue
|
||||||
|
newCSConf = ((CapacityScheduler) rm.getResourceScheduler())
|
||||||
|
.getConfiguration();
|
||||||
|
cOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ "root.c" + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
assertNull("Failed to unset Leaf Queue OrderingPolicy",
|
||||||
|
newCSConf.get(cOrderingPolicy));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveQueue() throws Exception {
|
public void testRemoveQueue() throws Exception {
|
||||||
WebResource r = resource();
|
WebResource r = resource();
|
||||||
|
|
Loading…
Reference in New Issue