YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against queues. Contributed by Wangda Tan.
(cherry picked from commit a3839a9fbf
)
This commit is contained in:
parent
22ecbd4ab3
commit
52e57a95d9
|
@ -855,6 +855,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2812. TestApplicationHistoryServer is likely to fail on less powerful machine.
|
YARN-2812. TestApplicationHistoryServer is likely to fail on less powerful machine.
|
||||||
(Zhijie Shen via xgong)
|
(Zhijie Shen via xgong)
|
||||||
|
|
||||||
|
YARN-2744. Fixed CapacityScheduler to validate node-labels correctly against
|
||||||
|
queues. (Wangda Tan via vinodkv)
|
||||||
|
|
||||||
Release 2.5.2 - UNRELEASED
|
Release 2.5.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -105,9 +105,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
// inherit from parent if labels not set
|
// inherit from parent if labels not set
|
||||||
if (this.accessibleLabels == null && parent != null) {
|
if (this.accessibleLabels == null && parent != null) {
|
||||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||||
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
|
|
||||||
this.accessibleLabels);
|
|
||||||
}
|
}
|
||||||
|
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
|
||||||
|
this.accessibleLabels);
|
||||||
|
|
||||||
// inherit from parent if labels not set
|
// inherit from parent if labels not set
|
||||||
if (this.defaultLabelExpression == null && parent != null
|
if (this.defaultLabelExpression == null && parent != null
|
||||||
|
|
|
@ -26,9 +26,11 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.service.ServiceOperations;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
|
@ -80,7 +82,7 @@ public class TestQueueParsing {
|
||||||
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
|
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
|
||||||
Assert.assertEquals(0.7 * 0.55 * 0.7,
|
Assert.assertEquals(0.7 * 0.55 * 0.7,
|
||||||
c12.getAbsoluteMaximumCapacity(), DELTA);
|
c12.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
capacityScheduler.stop();
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
@ -164,7 +166,7 @@ public class TestQueueParsing {
|
||||||
capacityScheduler.init(conf);
|
capacityScheduler.init(conf);
|
||||||
capacityScheduler.start();
|
capacityScheduler.start();
|
||||||
capacityScheduler.reinitialize(conf, null);
|
capacityScheduler.reinitialize(conf, null);
|
||||||
capacityScheduler.stop();
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMaxCapacity() throws Exception {
|
public void testMaxCapacity() throws Exception {
|
||||||
|
@ -339,6 +341,27 @@ public class TestQueueParsing {
|
||||||
conf.setCapacityByLabel(A2, "red", 50);
|
conf.setCapacityByLabel(A2, "red", 50);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void setupQueueConfigurationWithSingleLevel(
|
||||||
|
CapacitySchedulerConfiguration conf) {
|
||||||
|
// Define top-level queues
|
||||||
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||||
|
|
||||||
|
// Set A configuration
|
||||||
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 10);
|
||||||
|
conf.setMaximumCapacity(A, 15);
|
||||||
|
conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
|
||||||
|
conf.setCapacityByLabel(A, "red", 90);
|
||||||
|
conf.setCapacityByLabel(A, "blue", 90);
|
||||||
|
|
||||||
|
// Set B configuraiton
|
||||||
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 90);
|
||||||
|
conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue"));
|
||||||
|
conf.setCapacityByLabel(B, "red", 10);
|
||||||
|
conf.setCapacityByLabel(B, "blue", 10);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueParsingReinitializeWithLabels() throws IOException {
|
public void testQueueParsingReinitializeWithLabels() throws IOException {
|
||||||
CapacitySchedulerConfiguration csConf =
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
@ -362,7 +385,7 @@ public class TestQueueParsing {
|
||||||
conf = new YarnConfiguration(csConf);
|
conf = new YarnConfiguration(csConf);
|
||||||
capacityScheduler.reinitialize(conf, rmContext);
|
capacityScheduler.reinitialize(conf, rmContext);
|
||||||
checkQueueLabels(capacityScheduler);
|
checkQueueLabels(capacityScheduler);
|
||||||
capacityScheduler.stop();
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQueueLabels(CapacityScheduler capacityScheduler) {
|
private void checkQueueLabels(CapacityScheduler capacityScheduler) {
|
||||||
|
@ -429,7 +452,7 @@ public class TestQueueParsing {
|
||||||
capacityScheduler.init(csConf);
|
capacityScheduler.init(csConf);
|
||||||
capacityScheduler.start();
|
capacityScheduler.start();
|
||||||
checkQueueLabels(capacityScheduler);
|
checkQueueLabels(capacityScheduler);
|
||||||
capacityScheduler.stop();
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -451,6 +474,117 @@ public class TestQueueParsing {
|
||||||
capacityScheduler.init(csConf);
|
capacityScheduler.init(csConf);
|
||||||
capacityScheduler.start();
|
capacityScheduler.start();
|
||||||
checkQueueLabelsInheritConfig(capacityScheduler);
|
checkQueueLabelsInheritConfig(capacityScheduler);
|
||||||
capacityScheduler.stop();
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testQueueParsingWhenLabelsNotExistedInNodeLabelManager()
|
||||||
|
throws IOException {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfigurationWithLabels(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(csConf),
|
||||||
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
|
||||||
|
nodeLabelsManager.init(conf);
|
||||||
|
nodeLabelsManager.start();
|
||||||
|
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelsManager);
|
||||||
|
capacityScheduler.setConf(csConf);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(csConf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testQueueParsingWhenLabelsInheritedNotExistedInNodeLabelManager()
|
||||||
|
throws IOException {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfigurationWithLabelsInherit(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(csConf),
|
||||||
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
|
||||||
|
nodeLabelsManager.init(conf);
|
||||||
|
nodeLabelsManager.start();
|
||||||
|
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelsManager);
|
||||||
|
capacityScheduler.setConf(csConf);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(csConf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testSingleLevelQueueParsingWhenLabelsNotExistedInNodeLabelManager()
|
||||||
|
throws IOException {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfigurationWithSingleLevel(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(csConf),
|
||||||
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
|
||||||
|
nodeLabelsManager.init(conf);
|
||||||
|
nodeLabelsManager.start();
|
||||||
|
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelsManager);
|
||||||
|
capacityScheduler.setConf(csConf);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(csConf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testQueueParsingWhenLabelsNotExist() throws IOException {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
setupQueueConfigurationWithLabels(csConf);
|
||||||
|
|
||||||
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
|
RMContextImpl rmContext =
|
||||||
|
new RMContextImpl(null, null, null, null, null, null,
|
||||||
|
new RMContainerTokenSecretManager(csConf),
|
||||||
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
|
new ClientToAMTokenSecretManagerInRM(), null);
|
||||||
|
|
||||||
|
RMNodeLabelsManager nodeLabelsManager = new MemoryRMNodeLabelsManager();
|
||||||
|
nodeLabelsManager.init(conf);
|
||||||
|
nodeLabelsManager.start();
|
||||||
|
|
||||||
|
rmContext.setNodeLabelManager(nodeLabelsManager);
|
||||||
|
capacityScheduler.setConf(csConf);
|
||||||
|
capacityScheduler.setRMContext(rmContext);
|
||||||
|
capacityScheduler.init(csConf);
|
||||||
|
capacityScheduler.start();
|
||||||
|
ServiceOperations.stopQuietly(capacityScheduler);
|
||||||
|
ServiceOperations.stopQuietly(nodeLabelsManager);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue