YARN-7274. Ability to disable elasticity at leaf queue level. (Zian Chen via wangda)

Change-Id: Ic8d43e297f0f5de788b562f7eff8106c5c35e8d2
This commit is contained in:
Wangda Tan 2017-12-08 15:07:56 -08:00
parent ef7d334d36
commit 74665e3a7d
3 changed files with 329 additions and 18 deletions

View File

@ -66,19 +66,9 @@ class CSQueueUtils {
private static void capacitiesSanityCheck(String queueName,
QueueCapacities queueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
float capacity = queueCapacities.getCapacity(label);
float maximumCapacity = queueCapacities.getMaximumCapacity(label);
if (capacity > maximumCapacity) {
throw new IllegalArgumentException("Illegal queue capacity setting, "
+ "(capacity=" + capacity + ") > (maximum-capacity="
+ maximumCapacity + "). When label=[" + label + "]");
}
// Actually, this may not needed since we have verified capacity <=
// maximumCapacity. And the way we compute absolute capacity (abs(x) =
// cap(x) * cap(x.parent) * ...) is a monotone increasing function. But
// just keep it here to make sure our compute abs capacity method works
// correctly.
// The only thing we should care about is absolute capacity <=
// absolute max capacity otherwise the absolute max capacity is
// no longer an absolute maximum.
float absCapacity = queueCapacities.getAbsoluteCapacity(label);
float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label);
if (absCapacity > absMaxCapacity) {

View File

@ -2083,4 +2083,167 @@ public class TestNodeLabelContainerAllocation {
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
*
* root
* / \
* a b
* (x) (x)
* / \
* a1 a2
* (x) (x)
* </pre>
*
* a/b can access x, both of them has max-capacity-on-x = 50
*
* When doing non-exclusive allocation, app in a (or b) can use 100% of x
* resource.
*/
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 50);
csConf.setMaximumCapacity(queueA, 100);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 50);
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(queueB, 50);
csConf.setMaximumCapacity(queueB, 100);
csConf.setAccessibleNodeLabels(queueB, toSet("x"));
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 100);
// Define 2nd-level queues
csConf.setQueues(queueA, new String[] { "a1",
"a2"});
final String A1 = queueA + ".a1";
csConf.setCapacity(A1, 20);
csConf.setMaximumCapacity(A1, 60);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 60);
csConf.setMaximumCapacityByLabel(A1, "x", 30);
final String A2 = queueA + ".a2";
csConf.setCapacity(A2, 80);
csConf.setMaximumCapacity(A2, 40);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 40);
csConf.setMaximumCapacityByLabel(A2, "x", 20);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
// app1 -> a1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 6 partition=x containers
am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(14 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Try to launch app2 in a2, asked 2GB, should success
// app2 -> a2
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
// app2 asks for 4 partition=x containers
am2.allocate("*", 1 * GB, 4, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Kill all apps in queue a2
cs.killAllAppsInQueue("a2");
rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app2.getApplicationId());
// Try to launch app3 in a2, asked 6GB, should fail
// app3 -> a2
RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
am3.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
// app3 cannot preempt more resources restricted by disable elasticity
checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
// Kill all apps in queue a1
cs.killAllAppsInQueue("a1");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
// app4 -> a1, try to allocate more than 6GB resource, should fail
RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1);
// app3 asks for 7 partition=x containers
am4.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
doNMHeartbeat(rm1, nm1.getNodeId(), 50);
// app4 should only gets 6GB resource in partition=x
// since elasticity is disabled
checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(10 * GB,
reportNm1.getAvailableResource().getMemorySize());
rm1.close();
}
}

View File

@ -365,7 +365,76 @@ public class TestQueueParsing {
conf.setCapacityByLabel(B3, "red", 25);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsAndReleaseCheck
(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
// The cap <= max-cap check is not needed
conf.setCapacity(A, 50);
conf.setMaximumCapacity(A, 100);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacity(B, 50);
conf.setMaximumCapacity(B, 100);
LOG.info("Setup top-level queues");
// Define 2nd-level queues
final String A1 = A + ".a1";
final String A2 = A + ".a2";
conf.setQueues(A, new String[] {"a1", "a2"});
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(A, "red", 50);
conf.setMaximumCapacityByLabel(A, "red", 100);
conf.setCapacityByLabel(A, "blue", 30);
conf.setMaximumCapacityByLabel(A, "blue", 50);
conf.setCapacity(A1, 60);
conf.setMaximumCapacity(A1, 60);
conf.setCapacityByLabel(A1, "red", 60);
conf.setMaximumCapacityByLabel(A1, "red", 30);
conf.setCapacityByLabel(A1, "blue", 100);
conf.setMaximumCapacityByLabel(A1, "blue", 100);
conf.setCapacity(A2, 40);
conf.setMaximumCapacity(A2, 85);
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
conf.setCapacityByLabel(A2, "red", 40);
conf.setMaximumCapacityByLabel(A2, "red", 60);
final String B1 = B + ".b1";
final String B2 = B + ".b2";
final String B3 = B + ".b3";
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
conf.setCapacityByLabel(B, "red", 50);
conf.setMaximumCapacityByLabel(B, "red", 100);
conf.setCapacityByLabel(B, "blue", 70);
conf.setMaximumCapacityByLabel(B, "blue", 100);
conf.setCapacity(B1, 10);
conf.setMaximumCapacity(B1, 10);
conf.setCapacityByLabel(B1, "red", 60);
conf.setMaximumCapacityByLabel(B1, "red", 30);
conf.setCapacityByLabel(B1, "blue", 50);
conf.setMaximumCapacityByLabel(B1, "blue", 100);
conf.setCapacity(B2, 80);
conf.setMaximumCapacity(B2, 40);
conf.setCapacityByLabel(B2, "red", 30);
conf.setCapacityByLabel(B2, "blue", 25);
conf.setCapacity(B3, 10);
conf.setMaximumCapacity(B3, 25);
conf.setCapacityByLabel(B3, "red", 10);
conf.setCapacityByLabel(B3, "blue", 25);
}
private void setupQueueConfigurationWithLabelsInherit(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
@ -472,7 +541,7 @@ public class TestQueueParsing {
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
@ -481,7 +550,7 @@ public class TestQueueParsing {
Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
@ -489,7 +558,71 @@ public class TestQueueParsing {
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
private void checkQueueLabelsWithLeafQueueDisableElasticity
(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-A1 inherits A's configuration
Assert.assertTrue(capacityScheduler.getQueue("a1")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// queue-A2 is "red"
Assert.assertEquals(1, capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().size());
Assert.assertTrue(capacityScheduler.getQueue("a2")
.getAccessibleNodeLabels().contains("red"));
// queue-B is "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels()
.containsAll(ImmutableSet.of("red", "blue")));
// queue-B2 inherits "red"/"blue"
Assert.assertTrue(capacityScheduler.getQueue("b2")
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.4, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.4, qA2.getQueueCapacities()
.getCapacity("red"), DELTA);
Assert.assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.2, qA2.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.6, qA2.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check disable elasticity at leaf queue level without label
CSQueue qB2 = capacityScheduler.getQueue("b2");
Assert.assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA);
// check disable elasticity at leaf queue level with label
CSQueue qA1 = capacityScheduler.getQueue("a1");
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qA1.getQueueCapacities().
getAbsoluteMaximumCapacity("red"), DELTA);
CSQueue qB1 = capacityScheduler.getQueue("b1");
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.3, qB1.getQueueCapacities()
.getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.175, qB3.getQueueCapacities()
.getAbsoluteCapacity("blue"), DELTA);
Assert.assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities()
.getAbsoluteMaximumCapacity("blue"), DELTA);
}
private void
checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) {
// queue-A is red, blue
@ -514,7 +647,7 @@ public class TestQueueParsing {
@Test
public void testQueueParsingWithLabels() throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
@ -534,6 +667,31 @@ public class TestQueueParsing {
checkQueueLabels(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test
public void testQueueParsingWithLeafQueueDisableElasticity()
throws IOException {
nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity
(ImmutableSet.of("red", "blue"));
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(conf);
setupQueueConfigurationWithLabelsAndReleaseCheck(csConf);
CapacityScheduler capacityScheduler = new CapacityScheduler();
RMContextImpl rmContext =
new RMContextImpl(null, null, null, null, null, null,
new RMContainerTokenSecretManager(csConf),
new NMTokenSecretManagerInRM(csConf),
new ClientToAMTokenSecretManagerInRM(), null);
rmContext.setNodeLabelManager(nodeLabelManager);
capacityScheduler.setConf(csConf);
capacityScheduler.setRMContext(rmContext);
capacityScheduler.init(csConf);
capacityScheduler.start();
checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler);
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test
public void testQueueParsingWithLabelsInherit() throws IOException {