YARN-10564. Support Auto Queue Creation template configurations. Contributed by Andras Gyori.

This commit is contained in:
Peter Bacsko 2021-04-08 12:42:48 +02:00
parent 46a5979805
commit ca9aa91d10
6 changed files with 375 additions and 18 deletions

View File

@ -18,25 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -58,13 +48,13 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
@ -74,8 +64,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
@ -361,6 +360,10 @@ public abstract class AbstractCSQueue implements CSQueue {
writeLock.lock();
try {
if (isDynamicQueue() && getParent() instanceof ParentQueue) {
((ParentQueue) getParent()).getAutoCreatedQueueTemplate()
.setTemplateEntriesForChild(configuration, getQueuePath());
}
// get labels
this.accessibleLabels =
configuration.getAccessibleNodeLabels(getQueuePath());

View File

@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_QUEUE_CREATION_V2_PREFIX;
/**
* A handler for storing and setting auto created queue template settings.
*/
public class AutoCreatedQueueTemplate {
public static final String AUTO_QUEUE_TEMPLATE_PREFIX =
AUTO_QUEUE_CREATION_V2_PREFIX + "template.";
private static final String WILDCARD_QUEUE = "*";
private static final int MAX_WILDCARD_LEVEL = 1;
private final Map<String, String> templateProperties = new HashMap<>();
public AutoCreatedQueueTemplate(Configuration configuration,
String queuePath) {
setTemplateConfigEntries(configuration, queuePath);
}
@VisibleForTesting
public static String getAutoQueueTemplatePrefix(String queue) {
return CapacitySchedulerConfiguration.getQueuePrefix(queue)
+ AUTO_QUEUE_TEMPLATE_PREFIX;
}
/**
* Get the template properties attached to a parent queue.
* @return template property names and values
*/
public Map<String, String> getTemplateProperties() {
return templateProperties;
}
/**
* Sets the configuration properties of a child queue based on its parent
* template settings.
* @param conf configuration to set
* @param childQueuePath child queue path used for prefixing the properties
*/
public void setTemplateEntriesForChild(Configuration conf,
String childQueuePath) {
// Get all properties that are explicitly set
Set<String> alreadySetProps = conf.getPropsWithPrefix(
CapacitySchedulerConfiguration.getQueuePrefix(childQueuePath)).keySet();
for (Map.Entry<String, String> entry : templateProperties.entrySet()) {
// Do not overwrite explicitly configured properties
if (alreadySetProps.contains(entry.getKey())) {
continue;
}
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(
childQueuePath) + entry.getKey(), entry.getValue());
}
}
/**
* Store the template configuration properties. Explicit templates always take
* precedence over wildcard values. An example template precedence
* hierarchy for root.a ParentQueue from highest to lowest:
* yarn.scheduler.capacity.root.a.auto-queue-creation-v2.template.capacity
* yarn.scheduler.capacity.root.*.auto-queue-creation-v2.template.capacity
*/
private void setTemplateConfigEntries(Configuration configuration,
String queuePath) {
List<String> queuePathParts = new ArrayList<>(Arrays.asList(
queuePath.split("\\.")));
if (queuePathParts.size() <= 1) {
// This is either root or an empty queue name
return;
}
int queuePathMaxIndex = queuePathParts.size() - 1;
// start with the most explicit format (without wildcard)
int wildcardLevel = 0;
// root can not be wildcarded
// MAX_WILDCARD_LEVEL will be configurable in the future
int supportedWildcardLevel = Math.min(queuePathMaxIndex - 1,
MAX_WILDCARD_LEVEL);
// Collect all template entries
while (wildcardLevel <= supportedWildcardLevel) {
// Get all config entries with the specified prefix
String templateQueuePath = String.join(".", queuePathParts);
// Get all configuration entries with
// <queuePath>.auto-queue-creation-v2.template prefix
Map<String, String> props = configuration.getPropsWithPrefix(
getAutoQueueTemplatePrefix(templateQueuePath));
for (Map.Entry<String, String> entry : props.entrySet()) {
// If an entry is already present, it had a higher precedence
templateProperties.putIfAbsent(entry.getKey(), entry.getValue());
}
// Replace a queue part with a wildcard based on the wildcard level
// eg. root.a -> root.*
int queuePartToWildcard = queuePathMaxIndex - wildcardLevel;
queuePathParts.set(queuePartToWildcard, WILDCARD_QUEUE);
++wildcardLevel;
}
}
}

View File

@ -2029,7 +2029,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
@Private
private static final String AUTO_QUEUE_CREATION_V2_PREFIX =
protected static final String AUTO_QUEUE_CREATION_V2_PREFIX =
"auto-queue-creation-v2.";
@Private

View File

@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.slf4j.Logger;
@ -101,6 +102,8 @@ public class ParentQueue extends AbstractCSQueue {
private final boolean allowZeroCapacitySum;
private AutoCreatedQueueTemplate autoCreatedQueueTemplate;
// effective min ratio per resource, it is used during updateClusterResource,
// leaf queue can use this to calculate effective resources.
// This field will not be edited, reference will point to a new immutable map
@ -152,6 +155,8 @@ public class ParentQueue extends AbstractCSQueue {
throws IOException {
writeLock.lock();
try {
autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
csConf, getQueuePath());
super.setupQueueConfigs(clusterResource, csConf);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
@ -477,6 +482,8 @@ public class ParentQueue extends AbstractCSQueue {
CapacitySchedulerConfiguration dupCSConfig =
new CapacitySchedulerConfiguration(
csContext.getConfiguration(), false);
autoCreatedQueueTemplate.setTemplateEntriesForChild(dupCSConfig,
childQueuePath);
if (isLeaf) {
// set to -1, to disable it
dupCSConfig.setUserLimitFactor(childQueuePath, -1);
@ -647,6 +654,18 @@ public class ParentQueue extends AbstractCSQueue {
Map<String, CSQueue> currentChildQueues = getQueuesMap(childQueues);
Map<String, CSQueue> newChildQueues = getQueuesMap(
newlyParsedParentQueue.childQueues);
// Reinitialize dynamic queues as well, because they are not parsed
for (String queue : Sets.difference(currentChildQueues.keySet(),
newChildQueues.keySet())) {
CSQueue candidate = currentChildQueues.get(queue);
if (candidate instanceof AbstractCSQueue) {
if (((AbstractCSQueue) candidate).isDynamicQueue()) {
candidate.reinitialize(candidate, clusterResource);
}
}
}
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();
@ -1217,7 +1236,9 @@ public class ParentQueue extends AbstractCSQueue {
// For dynamic queue, we will set weight to 1 every time, because it
// is possible new labels added to the parent.
if (((AbstractCSQueue) queue).isDynamicQueue()) {
queue.getQueueCapacities().setWeight(nodeLabel, 1f);
if (queue.getQueueCapacities().getWeight(nodeLabel) == -1f) {
queue.getQueueCapacities().setWeight(nodeLabel, 1f);
}
}
}
}
@ -1637,4 +1658,8 @@ public class ParentQueue extends AbstractCSQueue {
csContext.getConfiguration().
isAutoExpiredDeletionEnabled(this.getQueuePath());
}
public AutoCreatedQueueTemplate getAutoCreatedQueueTemplate() {
return autoCreatedQueueTemplate;
}
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestAutoCreatedQueueTemplate {
private static final String TEST_QUEUE_ABC = "root.a.b.c";
private static final String TEST_QUEUE_AB = "root.a.b";
private static final String TEST_QUEUE_A = "root.a";
private static final String ROOT = "root";
private CapacitySchedulerConfiguration conf;
@Before
public void setUp() throws Exception {
conf = new CapacitySchedulerConfiguration();
conf.setQueues("root", new String[]{"a"});
conf.setQueues("a", new String[]{"b"});
conf.setQueues("b", new String[]{"c"});
}
@Test
public void testNonWildCardTemplate() {
conf.set(getTemplateKey(TEST_QUEUE_AB, "capacity"), "6w");
AutoCreatedQueueTemplate template =
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
Assert.assertEquals("weight is not set", 6f,
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
}
@Test
public void testOneLevelWildcardTemplate() {
conf.set(getTemplateKey("root.a.*", "capacity"), "6w");
AutoCreatedQueueTemplate template =
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
Assert.assertEquals("weight is not set", 6f,
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
}
@Test
public void testIgnoredWhenRootWildcarded() {
conf.set(getTemplateKey("*", "capacity"), "6w");
AutoCreatedQueueTemplate template =
new AutoCreatedQueueTemplate(conf, ROOT);
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A);
Assert.assertEquals("weight is set", -1f,
conf.getNonLabeledQueueWeight(TEST_QUEUE_A), 10e-6);
}
@Test
public void testIgnoredWhenNoParent() {
conf.set(getTemplateKey("root", "capacity"), "6w");
AutoCreatedQueueTemplate template =
new AutoCreatedQueueTemplate(conf, ROOT);
template.setTemplateEntriesForChild(conf, ROOT);
Assert.assertEquals("weight is set", -1f,
conf.getNonLabeledQueueWeight(ROOT), 10e-6);
}
@Test
public void testTemplatePrecedence() {
conf.set(getTemplateKey("root.a.b", "capacity"), "6w");
conf.set(getTemplateKey("root.a.*", "capacity"), "4w");
conf.set(getTemplateKey("root.*.*", "capacity"), "2w");
AutoCreatedQueueTemplate template =
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
Assert.assertEquals(
"explicit template does not have the highest precedence", 6f,
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration();
newConf.set(getTemplateKey("root.a.*", "capacity"), "4w");
template =
new AutoCreatedQueueTemplate(newConf, TEST_QUEUE_AB);
template.setTemplateEntriesForChild(newConf, TEST_QUEUE_ABC);
Assert.assertEquals("precedence is invalid", 4f,
newConf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
}
private String getTemplateKey(String queuePath, String entryKey) {
return CapacitySchedulerConfiguration.getQueuePrefix(queuePath)
+ AutoCreatedQueueTemplate.AUTO_QUEUE_TEMPLATE_PREFIX + entryKey;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -647,6 +648,85 @@ public class TestCapacitySchedulerNewQueueAutoCreation
}
}
@Test
public void testAutoCreatedQueueTemplateConfig() throws Exception {
startScheduler();
csConf.set(AutoCreatedQueueTemplate.getAutoQueueTemplatePrefix(
"root.a.*") + "capacity", "6w");
cs.reinitialize(csConf, mockRM.getRMContext());
LeafQueue a2 = createQueue("root.a.a-auto.a2");
Assert.assertEquals("weight is not set by template", 6f,
a2.getQueueCapacities().getWeight(), 1e-6);
cs.reinitialize(csConf, mockRM.getRMContext());
a2 = (LeafQueue) cs.getQueue("root.a.a-auto.a2");
Assert.assertEquals("weight is overridden", 6f,
a2.getQueueCapacities().getWeight(), 1e-6);
csConf.setNonLabeledQueueWeight("root.a.a-auto.a2", 4f);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("weight is not explicitly set", 4f,
a2.getQueueCapacities().getWeight(), 1e-6);
}
@Test
public void testAutoCreatedQueueConfigChange() throws Exception {
startScheduler();
LeafQueue a2 = createQueue("root.a.a-auto.a2");
csConf.setNonLabeledQueueWeight("root.a.a-auto.a2", 4f);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("weight is not explicitly set", 4f,
a2.getQueueCapacities().getWeight(), 1e-6);
a2 = (LeafQueue) cs.getQueue("root.a.a-auto.a2");
csConf.setState("root.a.a-auto.a2", QueueState.STOPPED);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("root.a.a-auto.a2 has not been stopped",
QueueState.STOPPED, a2.getState());
csConf.setState("root.a.a-auto.a2", QueueState.RUNNING);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("root.a.a-auto.a2 is not running",
QueueState.RUNNING, a2.getState());
}
@Test
public void testAutoCreateQueueState() throws Exception {
startScheduler();
createQueue("root.e.e1");
csConf.setState("root.e", QueueState.STOPPED);
csConf.setState("root.e.e1", QueueState.STOPPED);
csConf.setState("root.a", QueueState.STOPPED);
cs.reinitialize(csConf, mockRM.getRMContext());
// Make sure the static queue is stopped
Assert.assertEquals(cs.getQueue("root.a").getState(),
QueueState.STOPPED);
// If not set, default is the queue state of parent
Assert.assertEquals(cs.getQueue("root.a.a1").getState(),
QueueState.STOPPED);
Assert.assertEquals(cs.getQueue("root.e").getState(),
QueueState.STOPPED);
Assert.assertEquals(cs.getQueue("root.e.e1").getState(),
QueueState.STOPPED);
// Make root.e state to RUNNING
csConf.setState("root.e", QueueState.RUNNING);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals(cs.getQueue("root.e.e1").getState(),
QueueState.STOPPED);
// Make root.e.e1 state to RUNNING
csConf.setState("root.e.e1", QueueState.RUNNING);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals(cs.getQueue("root.e.e1").getState(),
QueueState.RUNNING);
}
@Test
public void testAutoQueueCreationDepthLimitFromStaticParent()
throws Exception {