YARN-10930. Introduce universal capacity resource vector. Contributed by Andras Gyori

This commit is contained in:
9uapaw 2021-10-22 17:32:33 +02:00 committed by Szilard Nemeth
parent 23772d946b
commit 32ecaed9c3
9 changed files with 1108 additions and 3 deletions

View File

@ -131,6 +131,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected CapacityConfigType capacityConfigType =
CapacityConfigType.NONE;
protected Map<String, QueueCapacityVector> configuredCapacityVectors;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
@ -374,6 +376,8 @@ public abstract class AbstractCSQueue implements CSQueue {
this.reservationsContinueLooking =
configuration.getReservationContinueLook();
this.configuredCapacityVectors = csContext.getConfiguration()
.parseConfiguredResourceVector(queuePath, configuredNodeLabels);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@ -688,6 +692,12 @@ public abstract class AbstractCSQueue implements CSQueue {
minimumAllocation);
}
@Override
public QueueCapacityVector getConfiguredCapacityVector(
String label) {
return configuredCapacityVectors.get(label);
}
private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
QueueState previousState = getState();
QueueState configuredState = configuration
@ -978,7 +988,7 @@ public abstract class AbstractCSQueue implements CSQueue {
"Default lifetime " + defaultAppLifetime
+ " can't exceed maximum lifetime " + myMaxAppLifetime);
}
if (defaultAppLifetime <= 0) {
defaultAppLifetime = myMaxAppLifetime;
}

View File

@ -419,6 +419,14 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/
Resource getEffectiveCapacity(String label);
/**
* Get configured capacity resource vector parsed from the capacity config
* of the queue.
* @param label node label (partition)
* @return capacity resource vector
*/
QueueCapacityVector getConfiguredCapacityVector(String label);
/**
* Get effective capacity of queue. If min/max resource is configured,
* preference will be given to absolute configuration over normal capacity.

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueCapacityConfigParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,9 +74,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@ -413,6 +414,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final String MAPPING_RULE_FORMAT_DEFAULT =
MAPPING_RULE_FORMAT_LEGACY;
private static final QueueCapacityConfigParser queueCapacityConfigParser
= new QueueCapacityConfigParser();
private ConfigurationProperties configurationProperties;
/**
@ -454,7 +459,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return PREFIX + "user." + user + DOT;
}
private String getNodeLabelPrefix(String queue, String label) {
public static String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
}
@ -2571,6 +2576,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
}
public Map<String, QueueCapacityVector> parseConfiguredResourceVector(
String queuePath, Set<String> labels) {
Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>();
for (String label : labels) {
queueResourceVectors.put(label, queueCapacityConfigParser.parse(this, queuePath, label));
}
return queueResourceVectors;
}
private void updateMinMaxResourceToConf(String label, String queue,
Resource resource, String type) {
if (queue.equals("root")) {

View File

@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* Contains capacity values with calculation types associated for each
* resource.
*/
public class QueueCapacityVector implements
Iterable<QueueCapacityVector.QueueCapacityVectorEntry> {
private static final String START_PARENTHESES = "[";
private static final String END_PARENTHESES = "]";
private static final String RESOURCE_DELIMITER = ",";
private static final String VALUE_DELIMITER = "=";
private final ResourceVector resource;
private final Map<String, QueueCapacityType> capacityTypes
= new HashMap<>();
private final Map<QueueCapacityType, Set<String>> capacityTypePerResource
= new HashMap<>();
public QueueCapacityVector() {
this.resource = new ResourceVector();
}
private QueueCapacityVector(ResourceVector resource) {
this.resource = resource;
}
/**
* Creates a zero {@code QueueCapacityVector}. The resources are defined
* in absolute capacity type by default.
*
* @return zero capacity vector
*/
public static QueueCapacityVector newInstance() {
QueueCapacityVector newCapacityVector =
new QueueCapacityVector(ResourceVector.newInstance());
for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
newCapacityVector.storeResourceType(resourceEntry.getKey(),
QueueCapacityType.ABSOLUTE);
}
return newCapacityVector;
}
/**
* Creates a uniform and homogeneous {@code QueueCapacityVector}.
* The resources are defined in absolute capacity type by default.
*
* @param value value to be set for each resource
* @param capacityType capacity type to be set for each resource
* @return uniform capacity vector
*/
public static QueueCapacityVector of(
float value, QueueCapacityType capacityType) {
QueueCapacityVector newCapacityVector =
new QueueCapacityVector(ResourceVector.of(value));
for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
newCapacityVector.storeResourceType(resourceEntry.getKey(), capacityType);
}
return newCapacityVector;
}
public QueueCapacityVectorEntry getResource(String resourceName) {
return new QueueCapacityVectorEntry(capacityTypes.get(resourceName),
resourceName, resource.getValue(resourceName));
}
/**
* Returns the number of resources defined for this vector.
*
* @return number of resources
*/
public int getResourceCount() {
return capacityTypes.size();
}
/**
* Set the value and capacity type of a resource.
*
* @param resourceName name of the resource
* @param value value of the resource
* @param capacityType type of the resource
*/
public void setResource(String resourceName, float value,
QueueCapacityType capacityType) {
// Necessary due to backward compatibility (memory = memory-mb)
String convertedResourceName = resourceName;
if (resourceName.equals("memory")) {
convertedResourceName = ResourceInformation.MEMORY_URI;
}
resource.setValue(convertedResourceName, value);
storeResourceType(convertedResourceName, capacityType);
}
/**
* A shorthand to retrieve the value stored for the memory resource.
*
* @return value of memory resource
*/
public float getMemory() {
return resource.getValue(ResourceInformation.MEMORY_URI);
}
/**
* Returns the name of all resources that are defined in the given capacity
* type.
*
* @param capacityType the capacity type of the resources
* @return all resource names for the given capacity type
*/
public Set<String> getResourceNamesByCapacityType(
QueueCapacityType capacityType) {
return capacityTypePerResource.getOrDefault(capacityType,
Collections.emptySet());
}
public boolean isResourceOfType(
String resourceName, QueueCapacityType capacityType) {
return capacityTypes.containsKey(resourceName) &&
capacityTypes.get(resourceName).equals(capacityType);
}
@Override
public Iterator<QueueCapacityVectorEntry> iterator() {
return new Iterator<QueueCapacityVectorEntry>() {
private final Iterator<Map.Entry<String, Float>> resources =
resource.iterator();
private int i = 0;
@Override
public boolean hasNext() {
return resources.hasNext() && capacityTypes.size() > i;
}
@Override
public QueueCapacityVectorEntry next() {
Map.Entry<String, Float> resourceInformation = resources.next();
i++;
return new QueueCapacityVectorEntry(
capacityTypes.get(resourceInformation.getKey()),
resourceInformation.getKey(), resourceInformation.getValue());
}
};
}
/**
* Returns a set of all capacity type defined for this vector.
*
* @return capacity types
*/
public Set<QueueCapacityType> getDefinedCapacityTypes() {
return capacityTypePerResource.keySet();
}
private void storeResourceType(
String resourceName, QueueCapacityType resourceType) {
if (capacityTypes.get(resourceName) != null
&& !capacityTypes.get(resourceName).equals(resourceType)) {
capacityTypePerResource.get(capacityTypes.get(resourceName))
.remove(resourceName);
}
capacityTypePerResource.putIfAbsent(resourceType, new HashSet<>());
capacityTypePerResource.get(resourceType).add(resourceName);
capacityTypes.put(resourceName, resourceType);
}
@Override
public String toString() {
StringBuilder stringVector = new StringBuilder();
stringVector.append(START_PARENTHESES);
int resourceCount = 0;
for (Map.Entry<String, Float> resourceEntry : resource) {
resourceCount++;
stringVector.append(resourceEntry.getKey())
.append(VALUE_DELIMITER)
.append(resourceEntry.getValue())
.append(capacityTypes.get(resourceEntry.getKey()).postfix);
if (resourceCount < capacityTypes.size()) {
stringVector.append(RESOURCE_DELIMITER);
}
}
stringVector.append(END_PARENTHESES);
return stringVector.toString();
}
/**
* Represents a capacity type associated with its syntax postfix.
*/
public enum QueueCapacityType {
PERCENTAGE("%"), ABSOLUTE(""), WEIGHT("w");
private final String postfix;
QueueCapacityType(String postfix) {
this.postfix = postfix;
}
public String getPostfix() {
return postfix;
}
}
public static class QueueCapacityVectorEntry {
private final QueueCapacityType vectorResourceType;
private final float resourceValue;
private final String resourceName;
public QueueCapacityVectorEntry(QueueCapacityType vectorResourceType,
String resourceName, float resourceValue) {
this.vectorResourceType = vectorResourceType;
this.resourceValue = resourceValue;
this.resourceName = resourceName;
}
public QueueCapacityType getVectorResourceType() {
return vectorResourceType;
}
public float getResourceValue() {
return resourceValue;
}
public String getResourceName() {
return resourceName;
}
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
* Represents a simple resource floating point value storage
* grouped by resource names.
*/
public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
private final Map<String, Float> resourcesByName = new HashMap<>();
/**
* Creates a new {@code ResourceVector} with all pre-defined resources set to
* zero.
* @return zero resource vector
*/
public static ResourceVector newInstance() {
ResourceVector zeroResourceVector = new ResourceVector();
for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
zeroResourceVector.setValue(resource.getName(), 0);
}
return zeroResourceVector;
}
/**
* Creates a new {@code ResourceVector} with all pre-defined resources set to
* the same value.
* @param value the value to set all resources to
* @return uniform resource vector
*/
public static ResourceVector of(float value) {
ResourceVector emptyResourceVector = new ResourceVector();
for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
emptyResourceVector.setValue(resource.getName(), value);
}
return emptyResourceVector;
}
/**
* Creates a new {@code ResourceVector} with the values set in a
* {@code Resource} object.
* @param resource resource object the resource vector will be based on
* @return uniform resource vector
*/
public static ResourceVector of(Resource resource) {
ResourceVector resourceVector = new ResourceVector();
for (ResourceInformation resourceInformation : resource.getResources()) {
resourceVector.setValue(resourceInformation.getName(),
resourceInformation.getValue());
}
return resourceVector;
}
/**
* Subtract values for each resource defined in the given resource vector.
* @param otherResourceVector rhs resource vector of the subtraction
*/
public void subtract(ResourceVector otherResourceVector) {
for (Map.Entry<String, Float> resource : otherResourceVector) {
setValue(resource.getKey(), getValue(resource.getKey()) - resource.getValue());
}
}
/**
* Increments the given resource by the specified value.
* @param resourceName name of the resource
* @param value value to be added to the resource's current value
*/
public void increment(String resourceName, float value) {
setValue(resourceName, getValue(resourceName) + value);
}
public Float getValue(String resourceName) {
return resourcesByName.get(resourceName);
}
public void setValue(String resourceName, float value) {
resourcesByName.put(resourceName, value);
}
@Override
public Iterator<Map.Entry<String, Float>> iterator() {
return resourcesByName.entrySet().iterator();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
return this.resourcesByName.equals(((ResourceVector) o).resourcesByName);
}
@Override
public int hashCode() {
return resourcesByName.hashCode();
}
}

View File

@ -0,0 +1,215 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A class that parses {@code QueueCapacityVector} from the capacity
* configuration property set for a queue.
*
* A new syntax for capacity property could be implemented, by creating a parser
* with a regex to match the pattern and a method that creates a
* {@code QueueCapacityVector} from the matched pattern.
* Extending the parsers field with a {@code Parser} object in the constructor
* is needed in this case.
*
* A new capacity type for the existing parsers could be added by extending
* the {@code QueueCapacityVector.QueueCapacityType} with a new type and its
* associated postfix symbol.
*/
public class QueueCapacityConfigParser {
private static final String UNIFORM_REGEX = "^([0-9.]+)(.*)";
private static final String RESOURCE_REGEX = "^\\[([\\w\\.,\\-_%\\ /]+=[\\w\\.,\\-_%\\ /]+)+\\]$";
private static final Pattern RESOURCE_PATTERN = Pattern.compile(RESOURCE_REGEX);
private static final Pattern UNIFORM_PATTERN = Pattern.compile(UNIFORM_REGEX);
public static final String FLOAT_DIGIT_REGEX = "[0-9.]";
private final List<Parser> parsers = new ArrayList<>();
public QueueCapacityConfigParser() {
parsers.add(new Parser(RESOURCE_PATTERN, this::heterogeneousParser));
parsers.add(new Parser(UNIFORM_PATTERN, this::uniformParser));
}
/**
* Creates a {@code QueueCapacityVector} parsed from the capacity configuration
* property set for a queue.
* @param conf configuration object
* @param queuePath queue for which the capacity property is parsed
* @param label node label
* @return a parsed capacity vector
*/
public QueueCapacityVector parse(CapacitySchedulerConfiguration conf,
String queuePath, String label) {
if (queuePath.equals(CapacitySchedulerConfiguration.ROOT)) {
return QueueCapacityVector.of(100f, QueueCapacityType.PERCENTAGE);
}
String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
queuePath, label) + CapacitySchedulerConfiguration.CAPACITY;
String capacityString = conf.get(propertyName);
if (capacityString == null) {
return new QueueCapacityVector();
}
// Trim all spaces from capacity string
capacityString = capacityString.replaceAll(" ", "");
for (Parser parser : parsers) {
Matcher matcher = parser.regex.matcher(capacityString);
if (matcher.find()) {
return parser.parser.apply(matcher);
}
}
return new QueueCapacityVector();
}
/**
* A parser method that is usable on uniform capacity values e.g. percentage or
* weight.
* @param matcher a regex matcher that contains parsed value and its possible
* suffix
* @return a parsed capacity vector
*/
private QueueCapacityVector uniformParser(Matcher matcher) {
QueueCapacityType capacityType = null;
String value = matcher.group(1);
if (matcher.groupCount() == 2) {
String matchedSuffix = matcher.group(2);
for (QueueCapacityType suffix : QueueCapacityType.values()) {
// Absolute uniform syntax is not supported
if (suffix.equals(QueueCapacityType.ABSOLUTE)) {
continue;
}
// when capacity is given in percentage, we do not need % symbol
String uniformSuffix = suffix.getPostfix().replaceAll("%", "");
if (uniformSuffix.equals(matchedSuffix)) {
capacityType = suffix;
}
}
}
if (capacityType == null) {
return new QueueCapacityVector();
}
return QueueCapacityVector.of(Float.parseFloat(value), capacityType);
}
/**
* A parser method that is usable on resource capacity values e.g. mixed or
* absolute resource.
* @param matcher a regex matcher that contains the matched resource string
* @return a parsed capacity vector
*/
private QueueCapacityVector heterogeneousParser(Matcher matcher) {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
/*
* Absolute resource configuration for a queue will be grouped by "[]".
* Syntax of absolute resource config could be like below
* "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
*/
// Get the sub-group.
String bracketedGroup = matcher.group(0);
// Get the string inside starting and closing []
bracketedGroup = bracketedGroup.substring(1, bracketedGroup.length() - 1);
// Split by comma and equals delimiter eg. the string memory=1024,vcores=6
// is converted to an array of array as {{memory,1024}, {vcores, 6}}
for (String kvPair : bracketedGroup.trim().split(",")) {
String[] splits = kvPair.split("=");
// Ensure that each sub string is key value pair separated by '='.
if (splits.length > 1) {
setCapacityVector(capacityVector, splits[0], splits[1]);
}
}
// Memory always have to be defined
if (capacityVector.getMemory() == 0L) {
return new QueueCapacityVector();
}
return capacityVector;
}
private void setCapacityVector(
QueueCapacityVector resource, String resourceName, String resourceValue) {
QueueCapacityType capacityType = QueueCapacityType.ABSOLUTE;
// Extract suffix from a value e.g. for 6w extract w
String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, "");
if (!resourceValue.endsWith(suffix)) {
return;
}
float parsedResourceValue = Float.parseFloat(resourceValue.substring(
0, resourceValue.length() - suffix.length()));
float convertedValue = parsedResourceValue;
if (!suffix.isEmpty() && UnitsConversionUtil.KNOWN_UNITS.contains(suffix)) {
// Convert all incoming units to MB if units is configured.
convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue);
} else {
for (QueueCapacityType capacityTypeSuffix : QueueCapacityType.values()) {
if (capacityTypeSuffix.getPostfix().equals(suffix)) {
capacityType = capacityTypeSuffix;
}
}
}
resource.setResource(resourceName, convertedValue, capacityType);
}
/**
* Checks whether the given capacity string is in a capacity vector compatible
* format.
* @param configuredCapacity capacity string
* @return true, if capacity string is in capacity vector format,
* false otherwise
*/
public boolean isCapacityVectorFormat(String configuredCapacity) {
return configuredCapacity != null
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
}
private static class Parser {
private final Pattern regex;
private final Function<Matcher, QueueCapacityVector> parser;
Parser(Pattern regex, Function<Matcher, QueueCapacityVector> parser) {
this.regex = regex;
this.parser = parser;
}
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
public class TestQueueCapacityVector {
private static final String CUSTOM_RESOURCE = "custom";
public static final String MIXED_CAPACITY_VECTOR_STRING =
"[custom=3.0,memory-mb=10.0w,vcores=6.0%]";
private final YarnConfiguration conf = new YarnConfiguration();
@Before
public void setUp() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);
}
@Test
public void getResourceNamesByCapacityType() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
// custom is not set, defaults to 0
Assert.assertEquals(1, capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.ABSOLUTE).size());
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE));
Assert.assertEquals(2, capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).size());
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).contains(VCORES_URI));
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).contains(MEMORY_URI));
Assert.assertEquals(10, capacityVector.getResource(MEMORY_URI).getResourceValue(), EPSILON);
Assert.assertEquals(6, capacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
}
@Test
public void isResourceOfType() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, QueueCapacityType.WEIGHT));
Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI, QueueCapacityType.PERCENTAGE));
Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE, QueueCapacityType.ABSOLUTE));
}
@Test
public void testIterator() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
List<QueueCapacityVectorEntry> entries = Lists.newArrayList(capacityVector);
Assert.assertEquals(3, entries.size());
QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
List<QueueCapacityVectorEntry> emptyEntries = Lists.newArrayList(emptyCapacityVector);
Assert.assertEquals(0, emptyEntries.size());
}
@Test
public void testToString() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
Assert.assertEquals(MIXED_CAPACITY_VECTOR_STRING, capacityVector.toString());
QueueCapacityVector emptyCapacityVector = new QueueCapacityVector();
Assert.assertEquals("[]", emptyCapacityVector.toString());
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
public class TestResourceVector {
private final static String CUSTOM_RESOURCE = "custom";
private final YarnConfiguration conf = new YarnConfiguration();
@Before
public void setUp() {
conf.set(YarnConfiguration.RESOURCE_TYPES, CUSTOM_RESOURCE);
ResourceUtils.resetResourceTypes(conf);
}
@Test
public void testCreation() {
ResourceVector zeroResourceVector = ResourceVector.newInstance();
Assert.assertEquals(0, zeroResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(0, zeroResourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(0, zeroResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
ResourceVector uniformResourceVector = ResourceVector.of(10);
Assert.assertEquals(10, uniformResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(10, uniformResourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(10, uniformResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
Map<String, Long> customResources = new HashMap<>();
customResources.put(CUSTOM_RESOURCE, 2L);
Resource resource = Resource.newInstance(10, 5, customResources);
ResourceVector resourceVectorFromResource = ResourceVector.of(resource);
Assert.assertEquals(10, resourceVectorFromResource.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(5, resourceVectorFromResource.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(2, resourceVectorFromResource.getValue(CUSTOM_RESOURCE), EPSILON);
}
@Test
public void testSubtract() {
ResourceVector lhsResourceVector = ResourceVector.of(13);
ResourceVector rhsResourceVector = ResourceVector.of(5);
lhsResourceVector.subtract(rhsResourceVector);
Assert.assertEquals(8, lhsResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(8, lhsResourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(8, lhsResourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
ResourceVector negativeResourceVector = ResourceVector.of(-100);
// Check whether overflow causes any issues
negativeResourceVector.subtract(ResourceVector.of(Float.MAX_VALUE));
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(CUSTOM_RESOURCE),
EPSILON);
}
@Test
public void testIncrement() {
ResourceVector resourceVector = ResourceVector.of(13);
resourceVector.increment(MEMORY_URI, 5);
Assert.assertEquals(18, resourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(13, resourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(13, resourceVector.getValue(CUSTOM_RESOURCE), EPSILON);
// Check whether overflow causes any issues
ResourceVector maxFloatResourceVector = ResourceVector.of(Float.MAX_VALUE);
maxFloatResourceVector.increment(MEMORY_URI, 100);
Assert.assertEquals(Float.MAX_VALUE, maxFloatResourceVector.getValue(MEMORY_URI), EPSILON);
}
@Test
public void testEquals() {
ResourceVector resourceVector = ResourceVector.of(13);
ResourceVector resourceVectorOther = ResourceVector.of(14);
Resource resource = Resource.newInstance(13, 13);
Assert.assertNotEquals(null, resourceVector);
Assert.assertNotEquals(resourceVectorOther, resourceVector);
Assert.assertNotEquals(resource, resourceVector);
ResourceVector resourceVectorOne = ResourceVector.of(1);
resourceVectorOther.subtract(resourceVectorOne);
Assert.assertEquals(resourceVectorOther, resourceVector);
}
}

View File

@ -0,0 +1,241 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
public class TestQueueCapacityConfigParser {
private static final String ALL_RESOURCE_TEMPLATE = "[memory-mb=%s, vcores=%s, yarn.io/gpu=%s]";
private static final String MEMORY_VCORE_TEMPLATE = "[memory-mb=%s, vcores=%s]";
private static final String MEMORY_ABSOLUTE = "12Gi";
private static final float VCORE_ABSOLUTE = 6;
private static final float GPU_ABSOLUTE = 10;
private static final float PERCENTAGE_VALUE = 50f;
private static final float MEMORY_MIXED = 1024;
private static final float WEIGHT_VALUE = 6;
private static final String QUEUE = "root.test";
private static final String ABSOLUTE_RESOURCE = String.format(
ALL_RESOURCE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE, GPU_ABSOLUTE);
private static final String ABSOLUTE_RESOURCE_MEMORY_VCORE = String.format(
MEMORY_VCORE_TEMPLATE, MEMORY_ABSOLUTE, VCORE_ABSOLUTE);
private static final String MIXED_RESOURCE = String.format(
ALL_RESOURCE_TEMPLATE, MEMORY_MIXED, PERCENTAGE_VALUE + "%", WEIGHT_VALUE + "w");
private static final String RESOURCE_TYPES = GPU_URI;
public static final String NONEXISTINGSUFFIX = "50nonexistingsuffix";
public static final String EMPTY_BRACKET = "[]";
public static final String INVALID_CAPACITY_BRACKET = "[invalid]";
public static final String INVALID_CAPACITY_FORMAT = "[memory-100,vcores-60]";
private final QueueCapacityConfigParser capacityConfigParser
= new QueueCapacityConfigParser();
@Test
public void testPercentageCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setCapacity(QUEUE, PERCENTAGE_VALUE);
QueueCapacityVector percentageCapacityVector = capacityConfigParser.parse(conf, QUEUE,
NO_LABEL);
QueueCapacityVectorEntry memory = percentageCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcore = percentageCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, memory.getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE, memory.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcore.getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE, vcore.getResourceValue(), EPSILON);
QueueCapacityVector rootCapacityVector = capacityConfigParser.parse(conf,
CapacitySchedulerConfiguration.ROOT, NO_LABEL);
QueueCapacityVectorEntry memoryRoot = rootCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcoreRoot = rootCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType());
Assert.assertEquals(100f, memoryRoot.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType());
Assert.assertEquals(100f, vcoreRoot.getResourceValue(), EPSILON);
}
@Test
public void testWeightCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setNonLabeledQueueWeight(QUEUE, WEIGHT_VALUE);
QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
QueueCapacityVectorEntry memory = weightCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcore = weightCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.WEIGHT, memory.getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE, memory.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.WEIGHT, vcore.getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE, vcore.getResourceValue(), EPSILON);
}
@Test
public void testAbsoluteCapacityVectorConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE);
conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
ResourceUtils.resetResourceTypes(conf);
QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(MEMORY_URI)
.getVectorResourceType());
Assert.assertEquals(12 * GB, absoluteCapacityVector.getResource(MEMORY_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
.getVectorResourceType());
Assert.assertEquals(VCORE_ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
.getVectorResourceType());
Assert.assertEquals(GPU_ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
.getResourceValue(), EPSILON);
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
QueueCapacityVector withoutGpuVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
Assert.assertEquals(3, withoutGpuVector.getResourceCount());
Assert.assertEquals(0f, withoutGpuVector.getResource(GPU_URI).getResourceValue(), EPSILON);
}
@Test
public void testMixedCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, MIXED_RESOURCE);
conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
ResourceUtils.resetResourceTypes(conf);
QueueCapacityVector mixedCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
Assert.assertEquals(QueueCapacityType.ABSOLUTE,
mixedCapacityVector.getResource(MEMORY_URI).getVectorResourceType());
Assert.assertEquals(MEMORY_MIXED, mixedCapacityVector.getResource(MEMORY_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE,
mixedCapacityVector.getResource(VCORES_URI).getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE,
mixedCapacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.WEIGHT,
mixedCapacityVector.getResource(GPU_URI).getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE,
mixedCapacityVector.getResource(GPU_URI).getResourceValue(), EPSILON);
// Test undefined capacity type default value
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
QueueCapacityVector mixedCapacityVectorWithGpuUndefined =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
Assert.assertEquals(QueueCapacityType.ABSOLUTE,
mixedCapacityVectorWithGpuUndefined.getResource(MEMORY_URI).getVectorResourceType());
Assert.assertEquals(0, mixedCapacityVectorWithGpuUndefined.getResource(GPU_URI)
.getResourceValue(), EPSILON);
}
@Test
public void testInvalidCapacityConfigs() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, NONEXISTINGSUFFIX);
QueueCapacityVector capacityVectorWithInvalidSuffix =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> entriesWithInvalidSuffix =
Lists.newArrayList(capacityVectorWithInvalidSuffix.iterator());
Assert.assertEquals(0, entriesWithInvalidSuffix.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_FORMAT);
QueueCapacityVector invalidDelimiterCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> invalidDelimiterEntries =
Lists.newArrayList(invalidDelimiterCapacityVector.iterator());
Assert.assertEquals(0, invalidDelimiterEntries.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_BRACKET);
QueueCapacityVector invalidCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> resources =
Lists.newArrayList(invalidCapacityVector.iterator());
Assert.assertEquals(0, resources.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, EMPTY_BRACKET);
QueueCapacityVector emptyBracketCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> emptyEntries =
Lists.newArrayList(emptyBracketCapacityVector.iterator());
Assert.assertEquals(0, emptyEntries.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, "");
QueueCapacityVector emptyCapacity =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> emptyResources =
Lists.newArrayList(emptyCapacity.iterator());
Assert.assertEquals(emptyResources.size(), 0);
conf.unset(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY);
QueueCapacityVector nonSetCapacity =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
List<QueueCapacityVectorEntry> nonSetResources =
Lists.newArrayList(nonSetCapacity.iterator());
Assert.assertEquals(nonSetResources.size(), 0);
}
}