YARN-2869. CapacityScheduler should trim sub queue names when parse configuration. Contributed by Wangda Tan
This commit is contained in:
parent
475c6b4978
commit
e69af836f3
|
@ -194,6 +194,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-2461. Fix PROCFS_USE_SMAPS_BASED_RSS_ENABLED property in
|
YARN-2461. Fix PROCFS_USE_SMAPS_BASED_RSS_ENABLED property in
|
||||||
YarnConfiguration. (rchiang via rkanter)
|
YarnConfiguration. (rchiang via rkanter)
|
||||||
|
|
||||||
|
YARN-2869. CapacityScheduler should trim sub queue names when parse
|
||||||
|
configuration. (Wangda Tan via jianhe)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -260,7 +260,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getQueuePrefix(String queue) {
|
static String getQueuePrefix(String queue) {
|
||||||
String queueName = PREFIX + queue + DOT;
|
String queueName = PREFIX + queue + DOT;
|
||||||
return queueName;
|
return queueName;
|
||||||
}
|
}
|
||||||
|
@ -538,6 +538,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
public String[] getQueues(String queue) {
|
public String[] getQueues(String queue) {
|
||||||
LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
|
LOG.debug("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
|
||||||
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
|
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);
|
||||||
|
List<String> trimmedQueueNames = new ArrayList<String>();
|
||||||
|
if (null != queues) {
|
||||||
|
for (String s : queues) {
|
||||||
|
trimmedQueueNames.add(s.trim());
|
||||||
|
}
|
||||||
|
queues = trimmedQueueNames.toArray(new String[0]);
|
||||||
|
}
|
||||||
|
|
||||||
LOG.debug("CSConf - getQueues: queuePrefix=" + getQueuePrefix(queue) +
|
LOG.debug("CSConf - getQueues: queuePrefix=" + getQueuePrefix(queue) +
|
||||||
", queues=" + ((queues == null) ? "" : StringUtils.arrayToString(queues)));
|
", queues=" + ((queues == null) ? "" : StringUtils.arrayToString(queues)));
|
||||||
return queues;
|
return queues;
|
||||||
|
|
|
@ -82,6 +82,56 @@ public class TestQueueParsing {
|
||||||
ServiceOperations.stopQuietly(capacityScheduler);
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupQueueConfigurationWithSpacesShouldBeTrimmed(
|
||||||
|
CapacitySchedulerConfiguration conf) {
|
||||||
|
// Define top-level queues
|
||||||
|
conf.set(
|
||||||
|
CapacitySchedulerConfiguration
|
||||||
|
.getQueuePrefix(CapacitySchedulerConfiguration.ROOT)
|
||||||
|
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
|
||||||
|
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 10);
|
||||||
|
conf.setMaximumCapacity(A, 15);
|
||||||
|
|
||||||
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 20);
|
||||||
|
|
||||||
|
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||||
|
conf.setCapacity(C, 70);
|
||||||
|
conf.setMaximumCapacity(C, 70);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(
|
||||||
|
CapacitySchedulerConfiguration conf) {
|
||||||
|
// Define top-level queues
|
||||||
|
conf.set(
|
||||||
|
CapacitySchedulerConfiguration
|
||||||
|
.getQueuePrefix(CapacitySchedulerConfiguration.ROOT)
|
||||||
|
+ CapacitySchedulerConfiguration.QUEUES, " a ,b, c");
|
||||||
|
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 10);
|
||||||
|
conf.setMaximumCapacity(A, 15);
|
||||||
|
|
||||||
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 20);
|
||||||
|
|
||||||
|
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||||
|
conf.setCapacity(C, 70);
|
||||||
|
conf.setMaximumCapacity(C, 70);
|
||||||
|
|
||||||
|
// sub queues for A
|
||||||
|
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(A)
|
||||||
|
+ CapacitySchedulerConfiguration.QUEUES, "a1, a2 ");
|
||||||
|
|
||||||
|
final String A1 = CapacitySchedulerConfiguration.ROOT + ".a.a1";
|
||||||
|
conf.setCapacity(A1, 60);
|
||||||
|
|
||||||
|
final String A2 = CapacitySchedulerConfiguration.ROOT + ".a.a2";
|
||||||
|
conf.setCapacity(A2, 40);
|
||||||
|
}
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
|
@ -659,4 +709,64 @@ public class TestQueueParsing {
|
||||||
DELTA);
|
DELTA);
|
||||||
capacityScheduler.stop();
|
capacityScheduler.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueParsingShouldTrimSpaces() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
capacityScheduler.setConf(conf);
|
||||||
|
capacityScheduler.setRMContext(TestUtils.getMockRMContext());
|
||||||
|
capacityScheduler.init(conf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
|
||||||
|
|
||||||
|
CSQueue a = capacityScheduler.getQueue("a");
|
||||||
|
Assert.assertNotNull(a);
|
||||||
|
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue c = capacityScheduler.getQueue("c");
|
||||||
|
Assert.assertNotNull(c);
|
||||||
|
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNestedQueueParsingShouldTrimSpaces() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
setupNestedQueueConfigurationWithSpacesShouldBeTrimmed(csConf);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
capacityScheduler.setConf(conf);
|
||||||
|
capacityScheduler.setRMContext(TestUtils.getMockRMContext());
|
||||||
|
capacityScheduler.init(conf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
capacityScheduler.reinitialize(conf, TestUtils.getMockRMContext());
|
||||||
|
|
||||||
|
CSQueue a = capacityScheduler.getQueue("a");
|
||||||
|
Assert.assertNotNull(a);
|
||||||
|
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue c = capacityScheduler.getQueue("c");
|
||||||
|
Assert.assertNotNull(c);
|
||||||
|
Assert.assertEquals(0.70, c.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.70, c.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue a1 = capacityScheduler.getQueue("a1");
|
||||||
|
Assert.assertNotNull(a1);
|
||||||
|
Assert.assertEquals(0.10 * 0.6, a1.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a1.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue a2 = capacityScheduler.getQueue("a2");
|
||||||
|
Assert.assertNotNull(a2);
|
||||||
|
Assert.assertEquals(0.10 * 0.4, a2.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a2.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue