YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are not mapped to queues by making default capacities per label to be zero. Contributed by Wangda Tan.
(cherry picked from commit 2ac1be7dec
)
This commit is contained in:
parent
bf79541868
commit
d5d2905262
|
@ -867,6 +867,10 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-2810. TestRMProxyUsersConf fails on Windows VMs. (Varun Vasudev via xgong)
|
YARN-2810. TestRMProxyUsersConf fails on Windows VMs. (Varun Vasudev via xgong)
|
||||||
|
|
||||||
|
YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are
|
||||||
|
not mapped to queues by making default capacities per label to be zero.
|
||||||
|
(Wangda Tan via vinodkv)
|
||||||
|
|
||||||
Release 2.5.2 - UNRELEASED
|
Release 2.5.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -202,6 +202,7 @@
|
||||||
<Field name="accessibleLabels" />
|
<Field name="accessibleLabels" />
|
||||||
<Field name="absoluteNodeLabelCapacities" />
|
<Field name="absoluteNodeLabelCapacities" />
|
||||||
<Field name="reservationsContinueLooking" />
|
<Field name="reservationsContinueLooking" />
|
||||||
|
<Field name="absoluteCapacityByNodeLabels" />
|
||||||
</Or>
|
</Or>
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
|
@ -86,6 +86,8 @@ public class TestDistributedShell {
|
||||||
|
|
||||||
// Setup queue access to node labels
|
// Setup queue access to node labels
|
||||||
conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
|
||||||
|
conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
|
||||||
|
"100");
|
||||||
conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
|
||||||
conf.set(
|
conf.set(
|
||||||
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
"yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
|
||||||
|
|
|
@ -395,16 +395,15 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public float getCapacityByNodeLabel(String label) {
|
public float getCapacityByNodeLabel(String label) {
|
||||||
if (null == parent) {
|
|
||||||
return 1f;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
|
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
|
||||||
|
if (null == parent) {
|
||||||
|
return 1f;
|
||||||
|
}
|
||||||
return getCapacity();
|
return getCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!capacitiyByNodeLabels.containsKey(label)) {
|
if (!capacitiyByNodeLabels.containsKey(label)) {
|
||||||
return 0;
|
return 0f;
|
||||||
} else {
|
} else {
|
||||||
return capacitiyByNodeLabels.get(label);
|
return capacitiyByNodeLabels.get(label);
|
||||||
}
|
}
|
||||||
|
@ -412,18 +411,17 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public float getAbsoluteCapacityByNodeLabel(String label) {
|
public float getAbsoluteCapacityByNodeLabel(String label) {
|
||||||
if (null == parent) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
|
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
|
||||||
|
if (null == parent) {
|
||||||
|
return 1f;
|
||||||
|
}
|
||||||
return getAbsoluteCapacity();
|
return getAbsoluteCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
|
if (!absoluteCapacityByNodeLabels.containsKey(label)) {
|
||||||
return 0;
|
return 0f;
|
||||||
} else {
|
} else {
|
||||||
return absoluteMaxCapacityByNodeLabels.get(label);
|
return absoluteCapacityByNodeLabels.get(label);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,7 +431,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return getAbsoluteMaximumCapacity();
|
return getAbsoluteMaximumCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
return getAbsoluteCapacityByNodeLabel(label);
|
if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
|
||||||
|
return 0f;
|
||||||
|
} else {
|
||||||
|
return absoluteMaxCapacityByNodeLabels.get(label);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
|
|
@ -461,19 +461,8 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
|
|
||||||
for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
|
for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
|
||||||
.getClusterNodeLabels() : labels) {
|
.getClusterNodeLabels() : labels) {
|
||||||
// capacity of all labels in each queue should be 1
|
|
||||||
if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
|
|
||||||
nodeLabelCapacities.put(label, 1.0f);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
|
String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
|
||||||
float capacity = getFloat(capacityPropertyName, UNDEFINED);
|
float capacity = getFloat(capacityPropertyName, 0f);
|
||||||
if (capacity == UNDEFINED) {
|
|
||||||
throw new IllegalArgumentException("Configuration issue: "
|
|
||||||
+ " node-label=" + label + " is accessible from queue=" + queue
|
|
||||||
+ " but has no capacity set, you should set "
|
|
||||||
+ capacityPropertyName + " in range of [0, 100].");
|
|
||||||
}
|
|
||||||
if (capacity < MINIMUM_CAPACITY_VALUE
|
if (capacity < MINIMUM_CAPACITY_VALUE
|
||||||
|| capacity > MAXIMUM_CAPACITY_VALUE) {
|
|| capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||||
throw new IllegalArgumentException("Illegal capacity of " + capacity
|
throw new IllegalArgumentException("Illegal capacity of " + capacity
|
||||||
|
@ -501,9 +490,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
.getClusterNodeLabels() : labels) {
|
.getClusterNodeLabels() : labels) {
|
||||||
float maxCapacity =
|
float maxCapacity =
|
||||||
getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
|
getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
|
||||||
UNDEFINED);
|
100f);
|
||||||
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
|
|
||||||
MAXIMUM_CAPACITY_VALUE : maxCapacity;
|
|
||||||
if (maxCapacity < MINIMUM_CAPACITY_VALUE
|
if (maxCapacity < MINIMUM_CAPACITY_VALUE
|
||||||
|| maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
|| maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
||||||
throw new IllegalArgumentException("Illegal " + "capacity of "
|
throw new IllegalArgumentException("Illegal " + "capacity of "
|
||||||
|
|
|
@ -340,6 +340,8 @@ public class TestContainerAllocation {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||||
|
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
conf.setCapacity(A, 10);
|
conf.setCapacity(A, 10);
|
||||||
|
@ -403,6 +405,9 @@ public class TestContainerAllocation {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
|
||||||
|
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
conf.setCapacity(A, 10);
|
conf.setCapacity(A, 10);
|
||||||
|
|
|
@ -18,10 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -51,8 +47,9 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
nodeLabelManager = mock(RMNodeLabelsManager.class);
|
nodeLabelManager = new MemoryRMNodeLabelsManager();
|
||||||
when(nodeLabelManager.containsNodeLabel(any(String.class))).thenReturn(true);
|
nodeLabelManager.init(new YarnConfiguration());
|
||||||
|
nodeLabelManager.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -255,6 +252,8 @@ public class TestQueueParsing {
|
||||||
private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
|
||||||
|
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
conf.setCapacity(A, 10);
|
conf.setCapacity(A, 10);
|
||||||
|
@ -271,6 +270,7 @@ public class TestQueueParsing {
|
||||||
conf.setQueues(A, new String[] {"a1", "a2"});
|
conf.setQueues(A, new String[] {"a1", "a2"});
|
||||||
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
|
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
|
||||||
conf.setCapacityByLabel(A, "red", 50);
|
conf.setCapacityByLabel(A, "red", 50);
|
||||||
|
conf.setMaximumCapacityByLabel(A, "red", 50);
|
||||||
conf.setCapacityByLabel(A, "blue", 50);
|
conf.setCapacityByLabel(A, "blue", 50);
|
||||||
|
|
||||||
conf.setCapacity(A1, 30);
|
conf.setCapacity(A1, 30);
|
||||||
|
@ -282,6 +282,7 @@ public class TestQueueParsing {
|
||||||
conf.setMaximumCapacity(A2, 85);
|
conf.setMaximumCapacity(A2, 85);
|
||||||
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
|
conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
|
||||||
conf.setCapacityByLabel(A2, "red", 50);
|
conf.setCapacityByLabel(A2, "red", 50);
|
||||||
|
conf.setMaximumCapacityByLabel(A2, "red", 60);
|
||||||
|
|
||||||
final String B1 = B + ".b1";
|
final String B1 = B + ".b1";
|
||||||
final String B2 = B + ".b2";
|
final String B2 = B + ".b2";
|
||||||
|
@ -311,6 +312,8 @@ public class TestQueueParsing {
|
||||||
CapacitySchedulerConfiguration conf) {
|
CapacitySchedulerConfiguration conf) {
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
|
||||||
|
conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
|
||||||
|
|
||||||
// Set A configuration
|
// Set A configuration
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
@ -364,6 +367,7 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueParsingReinitializeWithLabels() throws IOException {
|
public void testQueueParsingReinitializeWithLabels() throws IOException {
|
||||||
|
nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
|
||||||
CapacitySchedulerConfiguration csConf =
|
CapacitySchedulerConfiguration csConf =
|
||||||
new CapacitySchedulerConfiguration();
|
new CapacitySchedulerConfiguration();
|
||||||
setupQueueConfigurationWithoutLabels(csConf);
|
setupQueueConfigurationWithoutLabels(csConf);
|
||||||
|
@ -410,6 +414,22 @@ public class TestQueueParsing {
|
||||||
// queue-B2 inherits "red"/"blue"
|
// queue-B2 inherits "red"/"blue"
|
||||||
Assert.assertTrue(capacityScheduler.getQueue("b2")
|
Assert.assertTrue(capacityScheduler.getQueue("b2")
|
||||||
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
|
.getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
|
||||||
|
|
||||||
|
// check capacity of A2
|
||||||
|
CSQueue qA2 = capacityScheduler.getQueue("a2");
|
||||||
|
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA);
|
||||||
|
Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA);
|
||||||
|
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
|
||||||
|
|
||||||
|
// check capacity of B3
|
||||||
|
CSQueue qB3 = capacityScheduler.getQueue("b3");
|
||||||
|
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA);
|
||||||
|
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void
|
private void
|
||||||
|
@ -435,6 +455,8 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueParsingWithLabels() throws IOException {
|
public void testQueueParsingWithLabels() throws IOException {
|
||||||
|
nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
CapacitySchedulerConfiguration csConf =
|
CapacitySchedulerConfiguration csConf =
|
||||||
new CapacitySchedulerConfiguration(conf);
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
@ -457,6 +479,8 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueParsingWithLabelsInherit() throws IOException {
|
public void testQueueParsingWithLabelsInherit() throws IOException {
|
||||||
|
nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
CapacitySchedulerConfiguration csConf =
|
CapacitySchedulerConfiguration csConf =
|
||||||
new CapacitySchedulerConfiguration(conf);
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
@ -587,4 +611,52 @@ public class TestQueueParsing {
|
||||||
ServiceOperations.stopQuietly(capacityScheduler);
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
ServiceOperations.stopQuietly(nodeLabelsManager);
|
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueParsingWithUnusedLabels() throws IOException {
|
||||||
|
final ImmutableSet<String> labels = ImmutableSet.of("red", "blue");
|
||||||
|
|
||||||
|
// Initialize a cluster with labels, but doesn't use them, reinitialize
|
||||||
|
// shouldn't fail
|
||||||
|
nodeLabelManager.addToCluserNodeLabels(labels);
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(csConf);
|
||||||
|
csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, labels);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
capacityScheduler.setConf(conf);
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(csConf),
|
||||||
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelManager);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(conf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
capacityScheduler.reinitialize(conf, rmContext);
|
||||||
|
|
||||||
|
// check root queue's capacity by label -- they should be all zero
|
||||||
|
CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
|
||||||
|
Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA);
|
||||||
|
Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA);
|
||||||
|
|
||||||
|
CSQueue a = capacityScheduler.getQueue("a");
|
||||||
|
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue b1 = capacityScheduler.getQueue("b1");
|
||||||
|
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals("Parent B has no MAX_CAP", 0.85,
|
||||||
|
b1.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue c12 = capacityScheduler.getQueue("c12");
|
||||||
|
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
|
||||||
|
DELTA);
|
||||||
|
capacityScheduler.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue