YARN-10823. Expose all node labels for root without explicit configurations. Contributed by Andras Gyori
This commit is contained in:
parent
3c9e3d5321
commit
ed8e879320
|
@ -77,6 +77,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
|
||||||
|
|
||||||
public abstract class AbstractCSQueue implements CSQueue {
|
public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
@ -463,8 +464,13 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
if (csContext.getCapacitySchedulerQueueManager() != null
|
if (csContext.getCapacitySchedulerQueueManager() != null
|
||||||
&& csContext.getCapacitySchedulerQueueManager()
|
&& csContext.getCapacitySchedulerQueueManager()
|
||||||
.getConfiguredNodeLabels() != null) {
|
.getConfiguredNodeLabels() != null) {
|
||||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
if (getQueuePath().equals(ROOT)) {
|
||||||
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||||
|
.getConfiguredNodeLabels().getAllConfiguredLabels();
|
||||||
|
} else {
|
||||||
|
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||||
|
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Fallback to suboptimal but correct logic
|
// Fallback to suboptimal but correct logic
|
||||||
this.configuredNodeLabels = csContext.getConfiguration()
|
this.configuredNodeLabels = csContext.getConfiguration()
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Contains node labels for all queues extracted from configuration properties.
|
* Contains node labels for all queues extracted from configuration properties.
|
||||||
|
@ -74,4 +75,19 @@ public class ConfiguredNodeLabels {
|
||||||
String queuePath, Collection<String> nodeLabels) {
|
String queuePath, Collection<String> nodeLabels) {
|
||||||
configuredNodeLabelsByQueue.put(queuePath, new HashSet<>(nodeLabels));
|
configuredNodeLabelsByQueue.put(queuePath, new HashSet<>(nodeLabels));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all configured node labels aggregated from each queue.
|
||||||
|
* @return all node labels
|
||||||
|
*/
|
||||||
|
public Set<String> getAllConfiguredLabels() {
|
||||||
|
Set<String> nodeLabels = configuredNodeLabelsByQueue.values().stream()
|
||||||
|
.flatMap(Set::stream).collect(Collectors.toSet());
|
||||||
|
|
||||||
|
if (nodeLabels.size() == 0) {
|
||||||
|
nodeLabels = NO_LABEL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nodeLabels;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
import org.apache.hadoop.util.Sets;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -5216,6 +5217,37 @@ public class TestLeafQueue {
|
||||||
e.getMaxApplications());
|
e.getMaxApplications());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRootHasAllNodeLabelsOfItsDescendants() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = csConf;
|
||||||
|
String rootChild = root.getChildQueues().get(0).getQueuePath();
|
||||||
|
|
||||||
|
conf.setCapacityByLabel(rootChild, "test", 100);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + A, "test", 20);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + B, "test", 40);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + C, "test", 10);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + C + "." + C1, "test", 100);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + D, "test", 30);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + E, "test", 0);
|
||||||
|
|
||||||
|
conf.setCapacityByLabel(rootChild, "test2", 100);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + A, "test2", 20);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + B, "test2", 40);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + C, "test2", 10);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + C + "." + C1, "test2", 100);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + D, "test2", 30);
|
||||||
|
conf.setCapacityByLabel(rootChild + "." + E, "test2", 0);
|
||||||
|
|
||||||
|
cs.getCapacitySchedulerQueueManager().reinitConfiguredNodeLabels(conf);
|
||||||
|
cs.setMaxRunningAppsEnforcer(new CSMaxRunningAppsEnforcer(cs));
|
||||||
|
cs.reinitialize(conf, cs.getRMContext());
|
||||||
|
|
||||||
|
ParentQueue rootQueue = (ParentQueue) cs.getRootQueue();
|
||||||
|
|
||||||
|
Assert.assertEquals(Sets.newHashSet("", "test", "test2"),
|
||||||
|
rootQueue.configuredNodeLabels);
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (cs != null) {
|
if (cs != null) {
|
||||||
|
|
Loading…
Reference in New Issue