YARN-10780. Optimise retrieval of configured node labels in CS queues. Contributed by Andras Gyori.
This commit is contained in:
parent
0d07837712
commit
0934e783cf
|
@ -76,6 +76,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
|
||||
|
||||
public abstract class AbstractCSQueue implements CSQueue {
|
||||
|
@ -95,6 +96,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
|
||||
final ResourceCalculator resourceCalculator;
|
||||
Set<String> accessibleLabels;
|
||||
protected Set<String> configuredNodeLabels;
|
||||
Set<String> resourceTypes;
|
||||
final RMNodeLabelsManager labelManager;
|
||||
String defaultLabelExpression;
|
||||
|
@ -208,7 +210,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
protected void setupConfigurableCapacities(
|
||||
CapacitySchedulerConfiguration configuration) {
|
||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities,
|
||||
configuration);
|
||||
configuration, configuredNodeLabels);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -360,7 +362,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (isDynamicQueue()) {
|
||||
if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
|
||||
setDynamicQueueProperties(configuration);
|
||||
}
|
||||
// get labels
|
||||
|
@ -386,6 +388,17 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
if (csContext.getCapacitySchedulerQueueManager() != null
|
||||
&& csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels() != null) {
|
||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
||||
} else {
|
||||
// Fallback to suboptimal but correct logic
|
||||
this.configuredNodeLabels = csContext.getConfiguration()
|
||||
.getConfiguredNodeLabels(queuePath);
|
||||
}
|
||||
|
||||
// After we setup labels, we can setup capacities
|
||||
setupConfigurableCapacities(configuration);
|
||||
updateAbsoluteCapacities();
|
||||
|
@ -487,6 +500,19 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
if (getParent() instanceof ParentQueue) {
|
||||
((ParentQueue) getParent()).getAutoCreatedQueueTemplate()
|
||||
.setTemplateEntriesForChild(configuration, getQueuePath());
|
||||
|
||||
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
|
||||
AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX);
|
||||
parentTemplate = parentTemplate.substring(0, parentTemplate.lastIndexOf(
|
||||
DOT));
|
||||
Set<String> parentNodeLabels = csContext
|
||||
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.getLabelsByQueue(parentTemplate);
|
||||
|
||||
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
|
||||
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.setLabelsByQueue(queuePath, new HashSet<>(parentNodeLabels));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -571,10 +597,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
|
||||
protected void updateConfigurableResourceRequirement(String queuePath,
|
||||
Resource clusterResource) {
|
||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||
Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
|
||||
|
||||
for (String label : configuredNodelabels) {
|
||||
for (String label : configuredNodeLabels) {
|
||||
Resource minResource = getMinimumAbsoluteResource(queuePath, label);
|
||||
Resource maxResource = getMaximumAbsoluteResource(queuePath, label);
|
||||
|
||||
|
@ -1578,9 +1601,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
}
|
||||
|
||||
void updateEffectiveResources(Resource clusterResource) {
|
||||
Set<String> configuredNodelabels =
|
||||
csContext.getConfiguration().getConfiguredNodeLabels(getQueuePath());
|
||||
for (String label : configuredNodelabels) {
|
||||
for (String label : configuredNodeLabels) {
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(label,
|
||||
clusterResource);
|
||||
|
||||
|
@ -1715,5 +1736,4 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||
|
||||
|
@ -28,6 +27,8 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Leaf queues which are auto created by an underlying implementation of
|
||||
|
@ -122,6 +123,23 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setDynamicQueueProperties(
|
||||
CapacitySchedulerConfiguration configuration) {
|
||||
String parentTemplate = String.format("%s.%s", getParent().getQueuePath(),
|
||||
CapacitySchedulerConfiguration
|
||||
.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
|
||||
Set<String> parentNodeLabels = csContext
|
||||
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.getLabelsByQueue(parentTemplate);
|
||||
|
||||
if (parentNodeLabels != null && parentNodeLabels.size() > 1) {
|
||||
csContext.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.setLabelsByQueue(getQueuePath(),
|
||||
new HashSet<>(parentNodeLabels));
|
||||
}
|
||||
}
|
||||
|
||||
private void validate(final CSQueue newlyParsedQueue) throws IOException {
|
||||
if (!(newlyParsedQueue instanceof AutoCreatedLeafQueue) || !newlyParsedQueue
|
||||
.getQueuePath().equals(getQueuePath())) {
|
||||
|
|
|
@ -66,13 +66,12 @@ public class CSQueueUtils {
|
|||
return (parentAbsMaxCapacity * maximumCapacity);
|
||||
}
|
||||
|
||||
public static void loadCapacitiesByLabelsFromConf(String queuePath,
|
||||
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
|
||||
public static void loadCapacitiesByLabelsFromConf(
|
||||
String queuePath, QueueCapacities queueCapacities,
|
||||
CapacitySchedulerConfiguration csConf, Set<String> nodeLabels) {
|
||||
queueCapacities.clearConfigurableFields();
|
||||
Set<String> configuredNodelabels =
|
||||
csConf.getConfiguredNodeLabels(queuePath);
|
||||
|
||||
for (String label : configuredNodelabels) {
|
||||
for (String label : nodeLabels) {
|
||||
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
|
||||
queueCapacities.setCapacity(label,
|
||||
csConf.getNonLabeledQueueCapacity(queuePath) / 100);
|
||||
|
|
|
@ -1564,6 +1564,42 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return configuredNodeLabels;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get configured node labels for all queues that have accessible-node-labels
|
||||
* prefixed properties set.
|
||||
* @return configured node labels
|
||||
*/
|
||||
public Map<String, Set<String>> getConfiguredNodeLabelsByQueue() {
|
||||
Map<String, Set<String>> labelsByQueue = new HashMap<>();
|
||||
Map<String, String> schedulerEntries = getPropsWithPrefix(
|
||||
CapacitySchedulerConfiguration.PREFIX);
|
||||
|
||||
for (Map.Entry<String, String> propertyEntry
|
||||
: schedulerEntries.entrySet()) {
|
||||
String key = propertyEntry.getKey();
|
||||
// Consider all keys that has accessible-node-labels prefix, excluding
|
||||
// <queue-path>.accessible-node-labels itself
|
||||
if (key.contains(ACCESSIBLE_NODE_LABELS + DOT)) {
|
||||
// Find <label-name> in
|
||||
// <queue-path>.accessible-node-labels.<label-name>.property
|
||||
int labelStartIdx =
|
||||
key.indexOf(ACCESSIBLE_NODE_LABELS)
|
||||
+ ACCESSIBLE_NODE_LABELS.length() + 1;
|
||||
int labelEndIndx = key.indexOf('.', labelStartIdx);
|
||||
String labelName = key.substring(labelStartIdx, labelEndIndx);
|
||||
// Find queuePath and exclude "." at the end
|
||||
String queuePath = key.substring(0, key.indexOf(
|
||||
ACCESSIBLE_NODE_LABELS) - 1);
|
||||
if (!labelsByQueue.containsKey(queuePath)) {
|
||||
labelsByQueue.put(queuePath, new HashSet<>());
|
||||
labelsByQueue.get(queuePath).add(RMNodeLabelsManager.NO_LABEL);
|
||||
}
|
||||
labelsByQueue.get(queuePath).add(labelName);
|
||||
}
|
||||
}
|
||||
return labelsByQueue;
|
||||
}
|
||||
|
||||
public Integer getDefaultApplicationPriorityConfPerQueue(String queue) {
|
||||
Integer defaultPriority = getInt(getQueuePrefix(queue)
|
||||
+ DEFAULT_APPLICATION_PRIORITY,
|
||||
|
|
|
@ -84,6 +84,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
|
||||
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
|
||||
queueStateManager;
|
||||
private ConfiguredNodeLabels configuredNodeLabels;
|
||||
|
||||
/**
|
||||
* Construct the service.
|
||||
|
@ -98,6 +99,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
this.labelManager = labelManager;
|
||||
this.queueStateManager = new QueueStateManager<>();
|
||||
this.appPriorityACLManager = appPriorityACLManager;
|
||||
this.configuredNodeLabels = new ConfiguredNodeLabels();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,6 +167,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
*/
|
||||
public void initializeQueues(CapacitySchedulerConfiguration conf)
|
||||
throws IOException {
|
||||
configuredNodeLabels = new ConfiguredNodeLabels(conf);
|
||||
root = parseQueue(this.csContext, conf, null,
|
||||
CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
|
||||
setQueueAcls(authorizer, appPriorityACLManager, queues);
|
||||
|
@ -180,6 +183,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
throws IOException {
|
||||
// Parse new queues
|
||||
CSQueueStore newQueues = new CSQueueStore();
|
||||
configuredNodeLabels = new ConfiguredNodeLabels(newConf);
|
||||
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
|
||||
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
|
||||
|
||||
|
@ -619,6 +623,15 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
|
|||
return parentsToCreate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get {@code ConfiguredNodeLabels} which contains the configured node labels
|
||||
* for all queues.
|
||||
* @return configured node labels
|
||||
*/
|
||||
public ConfiguredNodeLabels getConfiguredNodeLabels() {
|
||||
return configuredNodeLabels;
|
||||
}
|
||||
|
||||
private LeafQueue createAutoQueue(ApplicationPlacementContext queue)
|
||||
throws SchedulerDynamicEditException {
|
||||
List<String> parentsToCreate = determineMissingParents(queue);
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Contains node labels for all queues extracted from configuration properties.
|
||||
* A queue has a configured node label if it has a property set with an
|
||||
* accessible-node-labels prefix.
|
||||
* Example:
|
||||
* yarn.scheduler.capacity.root.accessible-node-labels.test-label.capacity
|
||||
*/
|
||||
public class ConfiguredNodeLabels {
|
||||
private final Map<String, Set<String>> configuredNodeLabelsByQueue;
|
||||
private static final Set<String> NO_LABEL =
|
||||
ImmutableSet.of(RMNodeLabelsManager.NO_LABEL);
|
||||
|
||||
public ConfiguredNodeLabels() {
|
||||
configuredNodeLabelsByQueue = new HashMap<>();
|
||||
}
|
||||
|
||||
public ConfiguredNodeLabels(
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
this.configuredNodeLabelsByQueue = conf.getConfiguredNodeLabelsByQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a set of configured node labels for a queue. If no labels are set
|
||||
* for a queue, it defaults to a one element immutable collection containing
|
||||
* empty label.
|
||||
* @param queuePath path of the queue
|
||||
* @return configured node labels or an immutable set containing the empty
|
||||
* label
|
||||
*/
|
||||
public Set<String> getLabelsByQueue(String queuePath) {
|
||||
Set<String> labels = configuredNodeLabelsByQueue.get(queuePath);
|
||||
|
||||
if (labels == null) {
|
||||
return NO_LABEL;
|
||||
}
|
||||
|
||||
return ImmutableSet.copyOf(labels);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set node labels for a specific queue.
|
||||
* @param queuePath path of the queue
|
||||
* @param nodeLabels configured node labels to set
|
||||
*/
|
||||
public void setLabelsByQueue(
|
||||
String queuePath, Collection<String> nodeLabels) {
|
||||
configuredNodeLabelsByQueue.put(queuePath, new HashSet<>(nodeLabels));
|
||||
}
|
||||
}
|
|
@ -37,6 +37,7 @@ import java.util.Collections;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Auto Creation enabled Parent queue. This queue initially does not have any
|
||||
|
@ -164,10 +165,13 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
|||
CapacitySchedulerConfiguration conf =
|
||||
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
|
||||
builder.configuration(conf);
|
||||
String templateQueuePath = csContext.getConfiguration()
|
||||
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
|
||||
|
||||
for (String nodeLabel : conf
|
||||
.getConfiguredNodeLabels(csContext.getConfiguration()
|
||||
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()))) {
|
||||
Set<String> templateConfiguredNodeLabels = csContext
|
||||
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.getLabelsByQueue(templateQueuePath);
|
||||
for (String nodeLabel : templateConfiguredNodeLabels) {
|
||||
Resource templateMinResource = conf.getMinimumResourceRequirement(
|
||||
nodeLabel, csContext.getConfiguration()
|
||||
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
|
||||
|
@ -182,10 +186,10 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
|
|||
|
||||
//Load template capacities
|
||||
QueueCapacities queueCapacities = new QueueCapacities(false);
|
||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(csContext.getConfiguration()
|
||||
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
|
||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(templateQueuePath,
|
||||
queueCapacities,
|
||||
csContext.getConfiguration());
|
||||
csContext.getConfiguration(),
|
||||
templateConfiguredNodeLabels);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
|
@ -1258,9 +1257,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
// Update effective capacity in all parent queue.
|
||||
Set<String> configuredNodelabels = csContext.getConfiguration()
|
||||
.getConfiguredNodeLabels(getQueuePath());
|
||||
for (String label : configuredNodelabels) {
|
||||
for (String label : configuredNodeLabels) {
|
||||
calculateEffectiveResourcesAndCapacity(label, clusterResource);
|
||||
}
|
||||
|
||||
|
|
|
@ -706,6 +706,21 @@ public class TestCapacitySchedulerNewQueueAutoCreation
|
|||
LeafQueue a3 = createQueue("root.a.a3");
|
||||
Assert.assertFalse("auto queue deletion should be turned off on a3",
|
||||
a3.isEligibleForAutoDeletion());
|
||||
|
||||
// Set the capacity of label TEST
|
||||
csConf.set(AutoCreatedQueueTemplate.getAutoQueueTemplatePrefix(
|
||||
"root.c") + "accessible-node-labels.TEST.capacity", "6w");
|
||||
csConf.setQueues("root", new String[]{"a", "b", "c"});
|
||||
csConf.setAutoQueueCreationV2Enabled("root.c", true);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
LeafQueue c1 = createQueue("root.c.c1");
|
||||
Assert.assertEquals("weight is not set for label TEST", 6f,
|
||||
c1.getQueueCapacities().getWeight("TEST"), 1e-6);
|
||||
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||
c1 = (LeafQueue) cs.getQueue("root.c.c1");
|
||||
Assert.assertEquals("weight is not set for label TEST", 6f,
|
||||
c1.getQueueCapacities().getWeight("TEST"), 1e-6);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -60,7 +60,12 @@ public class TestReservationQueue {
|
|||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
csContext = mock(CapacitySchedulerContext.class);
|
||||
CapacitySchedulerQueueManager csQm = mock(
|
||||
CapacitySchedulerQueueManager.class);
|
||||
ConfiguredNodeLabels labels = new ConfiguredNodeLabels(csConf);
|
||||
when(csQm.getConfiguredNodeLabels()).thenReturn(labels);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getCapacitySchedulerQueueManager()).thenReturn(csQm);
|
||||
when(csContext.getConf()).thenReturn(conf);
|
||||
when(csContext.getMinimumResourceCapability()).thenReturn(
|
||||
Resources.createResource(GB, 1));
|
||||
|
|
Loading…
Reference in New Issue