YARN-10965. Centralize queue resource calculation based on CapacityVectors. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2023-01-26 19:45:54 +01:00
parent 815cde9810
commit cf1b3711cb
32 changed files with 2949 additions and 231 deletions

View File

@ -818,6 +818,28 @@ public class ResourceUtils {
return res;
}
public static Resource multiplyFloor(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);
for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
(long) Math.floor(resourceInformation.getValue() * multiplier));
}
return newResource;
}
public static Resource multiplyRound(Resource resource, double multiplier) {
Resource newResource = Resource.newInstance(0, 0);
for (ResourceInformation resourceInformation : resource.getResources()) {
newResource.setResourceValue(resourceInformation.getName(),
Math.round(resourceInformation.getValue() * multiplier));
}
return newResource;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource createResourceFromString(

View File

@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType.BRANCH_DOWNSCALED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ResourceCalculationDriver.MB_UNIT;
public class AbsoluteResourceCapacityCalculator extends AbstractQueueCapacityCalculator {
@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
setNormalizedResourceRatio(resourceCalculationDriver);
}
@Override
public double calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
String resourceName = context.getResourceName();
double normalizedRatio = resourceCalculationDriver.getNormalizedResourceRatios().getOrDefault(
label, ResourceVector.of(1)).getValue(resourceName);
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);
return normalizedRatio * remainingResourceRatio * context.getCurrentMinimumCapacityEntry(
label).getResourceValue();
}
@Override
public double calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
return context.getCurrentMaximumCapacityEntry(label).getResourceValue();
}
@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
CapacitySchedulerQueueCapacityHandler.setQueueCapacities(
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label), queue, label);
}
@Override
public ResourceUnitCapacityType getCapacityType() {
return ResourceUnitCapacityType.ABSOLUTE;
}
/**
* Calculates the normalized resource ratio of a parent queue, under which children are defined
* with absolute capacity type. If the effective resource of the parent is less, than the
* aggregated configured absolute resource of its children, the resource ratio will be less,
* than 1.
*
* @param calculationDriver the driver, which contains the parent queue that will form the base
* of the normalization calculation
*/
public static void setNormalizedResourceRatio(ResourceCalculationDriver calculationDriver) {
CSQueue queue = calculationDriver.getQueue();
for (String label : queue.getConfiguredNodeLabels()) {
// ManagedParents assign zero capacity to queues in case of overutilization, downscaling is
// turned off for their children
if (queue instanceof ManagedParentQueue) {
return;
}
for (String resourceName : queue.getConfiguredCapacityVector(label).getResourceNames()) {
long childrenConfiguredResource = 0;
long effectiveMinResource = queue.getQueueResourceQuotas().getEffectiveMinResource(
label).getResourceValue(resourceName);
// Total configured min resources of direct children of the queue
for (CSQueue childQueue : queue.getChildQueues()) {
if (!childQueue.getConfiguredNodeLabels().contains(label)) {
continue;
}
QueueCapacityVector capacityVector = childQueue.getConfiguredCapacityVector(label);
if (capacityVector.isResourceOfType(resourceName, ResourceUnitCapacityType.ABSOLUTE)) {
childrenConfiguredResource += capacityVector.getResource(resourceName)
.getResourceValue();
}
}
// If no children is using ABSOLUTE capacity type, normalization is not needed
if (childrenConfiguredResource == 0) {
continue;
}
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float numeratorForMinRatio = childrenConfiguredResource;
if (effectiveMinResource < childrenConfiguredResource) {
numeratorForMinRatio = queue.getQueueResourceQuotas().getEffectiveMinResource(label)
.getResourceValue(resourceName);
calculationDriver.getUpdateContext().addUpdateWarning(BRANCH_DOWNSCALED.ofQueue(
queue.getQueuePath()));
}
String unit = resourceName.equals(MEMORY_URI) ? MB_UNIT : "";
long convertedValue = UnitsConversionUtil.convert(unit, calculationDriver.getUpdateContext()
.getUpdatedClusterResource(label).getResourceInformation(resourceName).getUnits(),
childrenConfiguredResource);
if (convertedValue != 0) {
Map<String, ResourceVector> normalizedResourceRatios =
calculationDriver.getNormalizedResourceRatios();
normalizedResourceRatios.putIfAbsent(label, ResourceVector.newInstance());
normalizedResourceRatios.get(label).setValue(resourceName, numeratorForMinRatio /
convertedValue);
}
}
}
}
}

View File

@ -115,6 +115,7 @@ public abstract class AbstractCSQueue implements CSQueue {
CapacityConfigType.NONE;
protected Map<String, QueueCapacityVector> configuredCapacityVectors;
protected Map<String, QueueCapacityVector> configuredMaxCapacityVectors;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@ -379,7 +380,10 @@ public abstract class AbstractCSQueue implements CSQueue {
this.configuredCapacityVectors = configuration
.parseConfiguredResourceVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels());
this.configuredMaxCapacityVectors = configuration
.parseConfiguredMaximumCapacityVector(queuePath.getFullPath(),
this.queueNodeLabelsSettings.getConfiguredNodeLabels(),
QueueCapacityVector.newInstance());
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
@ -533,7 +537,8 @@ public abstract class AbstractCSQueue implements CSQueue {
private void validateAbsoluteVsPercentageCapacityConfig(
CapacityConfigType localType) {
if (!queuePath.isRoot()
&& !this.capacityConfigType.equals(localType)) {
&& !this.capacityConfigType.equals(localType) &&
queueContext.getConfiguration().isLegacyQueueMode()) {
throw new IllegalArgumentException("Queue '" + getQueuePath()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource.");
@ -572,11 +577,25 @@ public abstract class AbstractCSQueue implements CSQueue {
}
@Override
public QueueCapacityVector getConfiguredCapacityVector(
String label) {
public QueueCapacityVector getConfiguredCapacityVector(String label) {
return configuredCapacityVectors.get(label);
}
@Override
public QueueCapacityVector getConfiguredMaxCapacityVector(String label) {
return configuredMaxCapacityVectors.get(label);
}
@Override
public void setConfiguredMinCapacityVector(String label, QueueCapacityVector minCapacityVector) {
configuredCapacityVectors.put(label, minCapacityVector);
}
@Override
public void setConfiguredMaxCapacityVector(String label, QueueCapacityVector maxCapacityVector) {
configuredMaxCapacityVectors.put(label, maxCapacityVector);
}
protected QueueInfo getQueueInfo() {
// Deliberately doesn't use lock here, because this method will be invoked
// from schedulerApplicationAttempt, to avoid deadlock, sacrifice
@ -691,6 +710,11 @@ public abstract class AbstractCSQueue implements CSQueue {
return readLock;
}
@Override
public ReentrantReadWriteLock.WriteLock getWriteLock() {
return writeLock;
}
private Resource getCurrentLimitResource(String nodePartition,
Resource clusterResource, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
@ -827,6 +851,11 @@ public abstract class AbstractCSQueue implements CSQueue {
}
@Override
public Set<String> getConfiguredNodeLabels() {
return queueNodeLabelsSettings.getConfiguredNodeLabels();
}
private static String ensurePartition(String partition) {
return Optional.ofNullable(partition).orElse(NO_LABEL);
}

View File

@ -88,6 +88,9 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getACLsForFlexibleAutoCreatedLeafQueue;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;
public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractLeafQueue.class);
@ -164,7 +167,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
resourceCalculator);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps<>();
}
@SuppressWarnings("checkstyle:nowhitespaceafter")
@ -1936,6 +1939,49 @@ public class AbstractLeafQueue extends AbstractCSQueue {
currentResourceLimits.getLimit()));
}
@Override
public void refreshAfterResourceCalculation(Resource clusterResource,
ResourceLimits resourceLimits) {
lastClusterResource = clusterResource;
// Update maximum applications for the queue and for users
updateMaximumApplications();
updateCurrentResourceLimits(resourceLimits, clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
setQueueResourceLimitsInfo(clusterResource);
// Update user consumedRatios
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
NO_LABEL, this);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();
// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
computeUserLimitAndSetHeadroom(application, clusterResource,
NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
}
}
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
@ -2225,10 +2271,12 @@ public class AbstractLeafQueue extends AbstractCSQueue {
}
public void setCapacity(float capacity) {
configuredCapacityVectors.put(NO_LABEL, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(capacity);
}
public void setCapacity(String nodeLabel, float capacity) {
configuredCapacityVectors.put(nodeLabel, QueueCapacityVector.of(capacity * 100, PERCENTAGE));
queueCapacities.setCapacity(nodeLabel, capacity);
}

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import java.util.Set;
/**
* A strategy class to encapsulate queue capacity setup and resource calculation
* logic.
*/
public abstract class AbstractQueueCapacityCalculator {
/**
* Sets the metrics and statistics after effective resource values calculation.
*
* @param queue the queue on which the calculations are based
* @param resourceCalculationDriver driver that contains the intermediate calculation results for
* a queue branch
* @param label node label
*/
public abstract void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label);
/**
* Returns the capacity type the calculator could handle.
*
* @return capacity type
*/
public abstract ResourceUnitCapacityType getCapacityType();
/**
* Calculates the minimum effective resource.
*
* @param resourceCalculationDriver driver that contains the intermediate calculation results for
* a queue branch
* @param context the units evaluated in the current iteration phase
* @param label node label
* @return minimum effective resource
*/
public abstract double calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label);
/**
* Calculates the maximum effective resource.
*
* @param resourceCalculationDriver driver that contains the intermediate calculation results for
* a queue branch
* @param context the units evaluated in the current iteration phase
* @param label node label
* @return minimum effective resource
*/
public abstract double calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label);
/**
* Executes all logic that must be called prior to the effective resource value calculations.
*
* @param resourceCalculationDriver driver that contains the parent queue on which the
* prerequisite calculation should be made
*/
public abstract void calculateResourcePrerequisites(
ResourceCalculationDriver resourceCalculationDriver);
/**
* Returns all resource names that are defined for the capacity type that is
* handled by the calculator.
*
* @param queue queue for which the capacity vector is defined
* @param label node label
* @return resource names
*/
protected Set<String> getResourceNames(CSQueue queue, String label) {
return getResourceNames(queue, label, getCapacityType());
}
/**
* Returns all resource names that are defined for a capacity type.
*
* @param queue queue for which the capacity vector is defined
* @param label node label
* @param capacityType capacity type for which the resource names are defined
* @return resource names
*/
protected Set<String> getResourceNames(CSQueue queue, String label,
ResourceUnitCapacityType capacityType) {
return queue.getConfiguredCapacityVector(label)
.getResourceNamesByCapacityType(capacityType);
}
}

View File

@ -121,7 +121,7 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* cumulative capacity in the cluster
*/
public float getAbsoluteCapacity();
/**
* Get the configured maximum-capacity of the queue.
* @return the configured maximum-capacity of the queue
@ -169,7 +169,7 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @return max-parallel-applications
*/
public int getMaxParallelApps();
/**
* Get child queues
* @return child queues
@ -270,6 +270,9 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException;
public void refreshAfterResourceCalculation(
Resource clusterResource, ResourceLimits resourceLimits);
/**
* Update the cluster resource for queues as we add/remove nodes
* @param clusterResource the current cluster resource
@ -388,6 +391,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
*/
public ReentrantReadWriteLock.ReadLock getReadLock();
/**
* Get writeLock associated with the Queue.
* @return writeLock of corresponding queue.
*/
ReentrantReadWriteLock.WriteLock getWriteLock();
/**
* Validate submitApplication api so that moveApplication do a pre-check.
* @param applicationId Application ID
@ -433,13 +442,37 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
Resource getEffectiveCapacity(String label);
/**
* Get configured capacity resource vector parsed from the capacity config
* Get configured capacity vector parsed from the capacity config
* of the queue.
* @param label node label (partition)
* @return capacity resource vector
*/
QueueCapacityVector getConfiguredCapacityVector(String label);
/**
* Get configured maximum capacity vector parsed from the capacity config
* of the queue.
* @param label node label (partition)
* @return capacity resource vector
*/
QueueCapacityVector getConfiguredMaxCapacityVector(String label);
/**
* Sets the configured minimum capacity vector to a specific value.
* @param label node label (partition)
* @param minCapacityVector capacity vector
*/
void setConfiguredMinCapacityVector(String label, QueueCapacityVector minCapacityVector);
/**
* Sets the configured maximum capacity vector to a specific value.
* @param label node label (partition)
* @param maxCapacityVector capacity vector
*/
void setConfiguredMaxCapacityVector(String label, QueueCapacityVector maxCapacityVector);
Set<String> getConfiguredNodeLabels();
/**
* Get effective capacity of queue. If min/max resource is configured,
* preference will be given to absolute configuration over normal capacity.

View File

@ -75,4 +75,5 @@ public class CSQueueUsageTracker {
public QueueResourceQuotas getQueueResourceQuotas() {
return queueResourceQuotas;
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
/**
* A storage class that wraps arguments used in a resource calculation iteration.
*/
public class CalculationContext {
private final String resourceName;
private final ResourceUnitCapacityType capacityType;
private final CSQueue queue;
public CalculationContext(String resourceName, ResourceUnitCapacityType capacityType,
CSQueue queue) {
this.resourceName = resourceName;
this.capacityType = capacityType;
this.queue = queue;
}
public String getResourceName() {
return resourceName;
}
public ResourceUnitCapacityType getCapacityType() {
return capacityType;
}
public CSQueue getQueue() {
return queue;
}
/**
* A shorthand to return the minimum capacity vector entry for the currently evaluated child and
* resource name.
*
* @param label node label
* @return capacity vector entry
*/
public QueueCapacityVectorEntry getCurrentMinimumCapacityEntry(String label) {
return queue.getConfiguredCapacityVector(label).getResource(resourceName);
}
/**
* A shorthand to return the maximum capacity vector entry for the currently evaluated child and
* resource name.
*
* @param label node label
* @return capacity vector entry
*/
public QueueCapacityVectorEntry getCurrentMaximumCapacityEntry(String label) {
return queue.getConfiguredMaxCapacityVector(label).getResource(resourceName);
}
}

View File

@ -423,6 +423,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
private static final QueueCapacityConfigParser queueCapacityConfigParser
= new QueueCapacityConfigParser();
private static final String LEGACY_QUEUE_MODE_ENABLED = PREFIX + "legacy-queue-mode.enabled";
public static final boolean DEFAULT_LEGACY_QUEUE_MODE = true;
private ConfigurationProperties configurationProperties;
@ -572,8 +574,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
String configuredCapacity = get(getQueuePrefix(queue.getFullPath()) + CAPACITY);
boolean absoluteResourceConfigured = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
boolean isCapacityVectorFormat = queueCapacityConfigParser
.isCapacityVectorFormat(configuredCapacity);
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
configuredCapacity) || isCapacityVectorFormat) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
// updated. Once nodes are added/removed in cluster, capacity in
@ -623,7 +627,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public float getNonLabeledQueueMaximumCapacity(QueuePath queue) {
String configuredCapacity = get(getQueuePrefix(queue.getFullPath()) + MAXIMUM_CAPACITY);
boolean matcher = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
&& RESOURCE_PATTERN.matcher(configuredCapacity).find()
|| queueCapacityConfigParser.isCapacityVectorFormat(configuredCapacity);
if (matcher) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
@ -819,6 +824,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return Collections.unmodifiableSet(set);
}
public void setCapacityVector(String queuePath, String label, String capacityVector) {
String capacityPropertyName = getNodeLabelPrefix(queuePath, label) + CAPACITY;
set(capacityPropertyName, capacityVector);
}
public void setMaximumCapacityVector(String queuePath, String label, String capacityVector) {
String capacityPropertyName = getNodeLabelPrefix(queuePath, label) + MAXIMUM_CAPACITY;
set(capacityPropertyName, capacityVector);
}
private boolean configuredWeightAsCapacity(String configureValue) {
if (configureValue == null) {
return false;
@ -843,7 +858,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
(configuredCapacity != null) && RESOURCE_PATTERN.matcher(
configuredCapacity).find();
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
configuredCapacity) || queueCapacityConfigParser.isCapacityVectorFormat(configuredCapacity)) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource, and weight will be parsed
// and updated separately. Once nodes are added/removed in cluster,
@ -2701,7 +2716,28 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
String queuePath, Set<String> labels) {
Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>();
for (String label : labels) {
queueResourceVectors.put(label, queueCapacityConfigParser.parse(this, queuePath, label));
String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
queuePath, label) + CapacitySchedulerConfiguration.CAPACITY;
String capacityString = get(propertyName);
queueResourceVectors.put(label, queueCapacityConfigParser.parse(capacityString, queuePath));
}
return queueResourceVectors;
}
public Map<String, QueueCapacityVector> parseConfiguredMaximumCapacityVector(
String queuePath, Set<String> labels, QueueCapacityVector defaultVector) {
Map<String, QueueCapacityVector> queueResourceVectors = new HashMap<>();
for (String label : labels) {
String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
queuePath, label) + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY;
String capacityString = get(propertyName);
QueueCapacityVector capacityVector = queueCapacityConfigParser.parse(capacityString,
queuePath);
if (capacityVector.isEmpty()) {
capacityVector = defaultVector;
}
queueResourceVectors.put(label, capacityVector);
}
return queueResourceVectors;
@ -2806,6 +2842,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
String units = getUnits(splits[1]);
if (!UnitsConversionUtil.KNOWN_UNITS.contains(units)) {
return;
}
Long resourceValue = Long
.valueOf(splits[1].substring(0, splits[1].length() - units.length()));
@ -2888,6 +2929,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return normalizePolicyName(policyClassName.trim());
}
public boolean isLegacyQueueMode() {
return getBoolean(LEGACY_QUEUE_MODE_ENABLED, DEFAULT_LEGACY_QUEUE_MODE);
}
public void setLegacyQueueModeEnabled(boolean value) {
setBoolean(LEGACY_QUEUE_MODE_ENABLED, value);
}
public boolean getMultiNodePlacementEnabled() {
return getBoolean(MULTI_NODE_PLACEMENT_ENABLED,
DEFAULT_MULTI_NODE_PLACEMENT_ENABLED);

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
/**
* Controls how capacity and resource values are set and calculated for a queue.
* Effective minimum and maximum resource values are set for each label and resource separately.
*/
public class CapacitySchedulerQueueCapacityHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CapacitySchedulerQueueCapacityHandler.class);
private final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator>
calculators;
private final AbstractQueueCapacityCalculator rootCalculator =
new RootQueueCapacityCalculator();
private final RMNodeLabelsManager labelsManager;
private final Collection<String> definedResources = new LinkedHashSet<>();
public CapacitySchedulerQueueCapacityHandler(RMNodeLabelsManager labelsManager) {
this.calculators = new HashMap<>();
this.labelsManager = labelsManager;
this.calculators.put(ResourceUnitCapacityType.ABSOLUTE,
new AbsoluteResourceCapacityCalculator());
this.calculators.put(ResourceUnitCapacityType.PERCENTAGE,
new PercentageQueueCapacityCalculator());
this.calculators.put(ResourceUnitCapacityType.WEIGHT,
new WeightQueueCapacityCalculator());
loadResourceNames();
}
/**
* Updates the resource and metrics values of all children under a specific queue.
* These values are calculated at runtime.
*
* @param clusterResource resource of the cluster
* @param queue parent queue whose children will be updated
* @return update context that contains information about the update phase
*/
public QueueCapacityUpdateContext updateChildren(Resource clusterResource, CSQueue queue) {
ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
QueueCapacityUpdateContext updateContext =
new QueueCapacityUpdateContext(clusterResource, labelsManager);
update(queue, updateContext, resourceLimits);
return updateContext;
}
/**
* Updates the resource and metrics value of the root queue. Root queue always has percentage
* capacity type and is assigned the cluster resource as its minimum and maximum effective
* resource.
* @param rootQueue root queue
* @param clusterResource cluster resource
*/
public void updateRoot(CSQueue rootQueue, Resource clusterResource) {
ResourceLimits resourceLimits = new ResourceLimits(clusterResource);
QueueCapacityUpdateContext updateContext =
new QueueCapacityUpdateContext(clusterResource, labelsManager);
RootCalculationDriver rootCalculationDriver = new RootCalculationDriver(rootQueue,
updateContext,
rootCalculator, definedResources);
rootCalculationDriver.calculateResources();
rootQueue.refreshAfterResourceCalculation(updateContext.getUpdatedClusterResource(),
resourceLimits);
}
private void update(
CSQueue queue, QueueCapacityUpdateContext updateContext, ResourceLimits resourceLimits) {
if (queue == null || CollectionUtils.isEmpty(queue.getChildQueues())) {
return;
}
ResourceCalculationDriver resourceCalculationDriver = new ResourceCalculationDriver(
queue, updateContext, calculators, definedResources);
resourceCalculationDriver.calculateResources();
updateChildrenAfterCalculation(resourceCalculationDriver, resourceLimits);
}
private void updateChildrenAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, ResourceLimits resourceLimits) {
ParentQueue parentQueue = (ParentQueue) resourceCalculationDriver.getQueue();
for (CSQueue childQueue : parentQueue.getChildQueues()) {
updateQueueCapacities(resourceCalculationDriver, childQueue);
ResourceLimits childLimit = parentQueue.getResourceLimitsOfChild(childQueue,
resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(),
resourceLimits, NO_LABEL, false);
childQueue.refreshAfterResourceCalculation(resourceCalculationDriver.getUpdateContext()
.getUpdatedClusterResource(), childLimit);
update(childQueue, resourceCalculationDriver.getUpdateContext(), childLimit);
}
}
/**
* Updates the capacity values of the currently evaluated child.
* @param queue queue on which the capacities are set
*/
private void updateQueueCapacities(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue) {
queue.getWriteLock().lock();
try {
for (String label : queue.getConfiguredNodeLabels()) {
QueueCapacityVector capacityVector = queue.getConfiguredCapacityVector(label);
if (capacityVector.isMixedCapacityVector()) {
// Post update capacities based on the calculated effective resource values
setQueueCapacities(resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(
label), queue, label);
} else {
// Update capacities according to the legacy logic
for (ResourceUnitCapacityType capacityType :
queue.getConfiguredCapacityVector(label).getDefinedCapacityTypes()) {
AbstractQueueCapacityCalculator calculator = calculators.get(capacityType);
calculator.updateCapacitiesAfterCalculation(resourceCalculationDriver, queue, label);
}
}
}
} finally {
queue.getWriteLock().unlock();
}
}
/**
* Sets capacity and absolute capacity values of a queue based on minimum and
* maximum effective resources.
*
* @param clusterResource overall cluster resource
* @param queue child queue for which the capacities are set
* @param label node label
*/
public static void setQueueCapacities(Resource clusterResource, CSQueue queue, String label) {
if (!(queue instanceof AbstractCSQueue)) {
return;
}
AbstractCSQueue csQueue = (AbstractCSQueue) queue;
ResourceCalculator resourceCalculator = csQueue.resourceCalculator;
CSQueue parent = queue.getParent();
if (parent == null) {
return;
}
// Update capacity with a double calculated from the parent's minResources
// and the recently changed queue minResources.
// capacity = effectiveMinResource / {parent's effectiveMinResource}
float result = resourceCalculator.divide(clusterResource,
queue.getQueueResourceQuotas().getEffectiveMinResource(label),
parent.getQueueResourceQuotas().getEffectiveMinResource(label));
queue.getQueueCapacities().setCapacity(label,
Float.isInfinite(result) ? 0 : result);
// Update maxCapacity with a double calculated from the parent's maxResources
// and the recently changed queue maxResources.
// maxCapacity = effectiveMaxResource / parent's effectiveMaxResource
result = resourceCalculator.divide(clusterResource,
queue.getQueueResourceQuotas().getEffectiveMaxResource(label),
parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
queue.getQueueCapacities().setMaximumCapacity(label,
Float.isInfinite(result) ? 0 : result);
csQueue.updateAbsoluteCapacities();
}
private void loadResourceNames() {
Set<String> resources = new HashSet<>(ResourceUtils.getResourceTypes().keySet());
if (resources.contains(MEMORY_URI)) {
resources.remove(MEMORY_URI);
definedResources.add(MEMORY_URI);
}
if (resources.contains(VCORES_URI)) {
resources.remove(VCORES_URI);
definedResources.add(VCORES_URI);
}
definedResources.addAll(resources);
}
}

View File

@ -81,6 +81,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
private CSQueue root;
private final RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
private CapacitySchedulerQueueCapacityHandler queueCapacityHandler;
private QueueStateManager<CSQueue, CapacitySchedulerConfiguration>
queueStateManager;
@ -100,6 +101,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
this.queueStateManager = new QueueStateManager<>();
this.appPriorityACLManager = appPriorityACLManager;
this.configuredNodeLabels = new ConfiguredNodeLabels();
this.queueCapacityHandler = new CapacitySchedulerQueueCapacityHandler(labelManager);
}
@Override
@ -413,6 +415,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
return this.queueStateManager;
}
public CapacitySchedulerQueueCapacityHandler getQueueCapacityHandler() {
return queueCapacityHandler;
}
/**
* Removes an {@code AutoCreatedLeafQueue} from the manager collection and
* from its parent children collection.

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
/**
* The default rounding strategy for resource calculation. Uses floor for all types except WEIGHT,
* which is always the last type to consider, therefore it is safe to round up.
*/
public class DefaultQueueResourceRoundingStrategy implements QueueResourceRoundingStrategy {
private final ResourceUnitCapacityType lastCapacityType;
public DefaultQueueResourceRoundingStrategy(
ResourceUnitCapacityType[] capacityTypePrecedence) {
if (capacityTypePrecedence.length == 0) {
throw new IllegalArgumentException("Capacity type precedence collection is empty");
}
lastCapacityType = capacityTypePrecedence[capacityTypePrecedence.length - 1];
}
@Override
public double getRoundedResource(double resourceValue, QueueCapacityVectorEntry capacityVectorEntry) {
if (capacityVectorEntry.getVectorResourceType().equals(lastCapacityType)) {
return Math.round(resourceValue);
} else {
return Math.floor(resourceValue);
}
}
}

View File

@ -302,94 +302,97 @@ public class ParentQueue extends AbstractCSQueue {
void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
writeLock.lock();
try {
QueueCapacityType childrenCapacityType =
getCapacityConfigurationTypeForQueues(childQueues);
QueueCapacityType parentCapacityType =
getCapacityConfigurationTypeForQueues(ImmutableList.of(this));
boolean isLegacyQueueMode = queueContext.getConfiguration().isLegacyQueueMode();
if (isLegacyQueueMode) {
QueueCapacityType childrenCapacityType =
getCapacityConfigurationTypeForQueues(childQueues);
QueueCapacityType parentCapacityType =
getCapacityConfigurationTypeForQueues(ImmutableList.of(this));
if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE
|| parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) {
// We don't allow any mixed absolute + {weight, percentage} between
// children and parent
if (childrenCapacityType != parentCapacityType && !this.getQueuePath()
.equals(CapacitySchedulerConfiguration.ROOT)) {
throw new IOException("Parent=" + this.getQueuePath()
+ ": When absolute minResource is used, we must make sure both "
+ "parent and child all use absolute minResource");
}
// Ensure that for each parent queue: parent.min-resource >=
// Σ(child.min-resource).
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
Resource minRes = Resources.createResource(0, 0);
for (CSQueue queue : childQueues) {
// Accumulate all min/max resource configured for all child queues.
Resources.addTo(minRes, queue.getQueueResourceQuotas()
.getConfiguredMinResource(nodeLabel));
}
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
queueContext.getClusterResource());
Resource parentMinResource =
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
throw new IOException(
"Parent Queues" + " capacity: " + parentMinResource
+ " is less than" + " to its children:" + minRes
+ " for queue:" + getQueueName());
}
}
}
// When child uses percent
if (childrenCapacityType == QueueCapacityType.PERCENT) {
float childrenPctSum = 0;
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
// check children's labels
childrenPctSum = 0;
for (CSQueue queue : childQueues) {
childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel);
if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE
|| parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) {
// We don't allow any mixed absolute + {weight, percentage} between
// children and parent
if (childrenCapacityType != parentCapacityType && !this.getQueuePath()
.equals(CapacitySchedulerConfiguration.ROOT)) {
throw new IOException("Parent=" + this.getQueuePath()
+ ": When absolute minResource is used, we must make sure both "
+ "parent and child all use absolute minResource");
}
if (Math.abs(1 - childrenPctSum) > PRECISION) {
// When children's percent sum != 100%
if (Math.abs(childrenPctSum) > PRECISION) {
// It is wrong when percent sum != {0, 1}
// Ensure that for each parent queue: parent.min-resource >=
// Σ(child.min-resource).
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
Resource minRes = Resources.createResource(0, 0);
for (CSQueue queue : childQueues) {
// Accumulate all min/max resource configured for all child queues.
Resources.addTo(minRes, queue.getQueueResourceQuotas()
.getConfiguredMinResource(nodeLabel));
}
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
queueContext.getClusterResource());
Resource parentMinResource =
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
throw new IOException(
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". It should be either 0 or 1.0");
} else{
// We also allow children's percent sum = 0 under the following
// conditions
// - Parent uses weight mode
// - Parent uses percent mode, and parent has
// (capacity=0 OR allowZero)
if (parentCapacityType == QueueCapacityType.PERCENT) {
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
> PRECISION) && (!allowZeroCapacitySum)) {
throw new IOException(
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName()
+ " for label=" + nodeLabel
+ ". It is set to 0, but parent percent != 0, and "
+ "doesn't allow children capacity to set to 0");
"Parent Queues" + " capacity: " + parentMinResource
+ " is less than" + " to its children:" + minRes
+ " for queue:" + getQueueName());
}
}
}
// When child uses percent
if (childrenCapacityType == QueueCapacityType.PERCENT) {
float childrenPctSum = 0;
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
// check children's labels
childrenPctSum = 0;
for (CSQueue queue : childQueues) {
childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel);
}
if (Math.abs(1 - childrenPctSum) > PRECISION) {
// When children's percent sum != 100%
if (Math.abs(childrenPctSum) > PRECISION) {
// It is wrong when percent sum != {0, 1}
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". It should be either 0 or 1.0");
} else {
// We also allow children's percent sum = 0 under the following
// conditions
// - Parent uses weight mode
// - Parent uses percent mode, and parent has
// (capacity=0 OR allowZero)
if (parentCapacityType == QueueCapacityType.PERCENT) {
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
> PRECISION) && (!allowZeroCapacitySum)) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName()
+ " for label=" + nodeLabel
+ ". It is set to 0, but parent percent != 0, and "
+ "doesn't allow children capacity to set to 0");
}
}
}
}
} else {
// Even if child pct sum == 1.0, we will make sure parent has
// positive percent.
if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs(
queueCapacities.getCapacity(nodeLabel)) <= 0f
&& !allowZeroCapacitySum) {
throw new IOException(
"Illegal capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". queue=" + getQueueName()
+ " has zero capacity, but child"
+ "queues have positive capacities");
} else {
// Even if child pct sum == 1.0, we will make sure parent has
// positive percent.
if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs(
queueCapacities.getCapacity(nodeLabel)) <= 0f
&& !allowZeroCapacitySum) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + getQueueName() + " for label="
+ nodeLabel + ". queue=" + getQueueName()
+ " has zero capacity, but child"
+ "queues have positive capacities");
}
}
}
}
@ -1057,7 +1060,7 @@ public class ParentQueue extends AbstractCSQueue {
return accept;
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
public ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, ResourceLimits parentLimits,
String nodePartition, boolean netLimit) {
// Set resource-limit of a given child, child.limit =
@ -1208,6 +1211,17 @@ public class ParentQueue extends AbstractCSQueue {
}
}
@Override
public void refreshAfterResourceCalculation(Resource clusterResource,
ResourceLimits resourceLimits) {
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
}
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
public class PercentageQueueCapacityCalculator extends AbstractQueueCapacityCalculator {
@Override
public double calculateMinimumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
String resourceName = context.getResourceName();
double parentAbsoluteCapacity = resourceCalculationDriver.getParentAbsoluteMinCapacity(label,
resourceName);
double remainingPerEffectiveResourceRatio =
resourceCalculationDriver.getRemainingRatioOfResource(label, resourceName);
double absoluteCapacity = parentAbsoluteCapacity * remainingPerEffectiveResourceRatio
* context.getCurrentMinimumCapacityEntry(label).getResourceValue() / 100;
return resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label)
.getResourceValue(resourceName) * absoluteCapacity;
}
@Override
public double calculateMaximumResource(
ResourceCalculationDriver resourceCalculationDriver, CalculationContext context,
String label) {
String resourceName = context.getResourceName();
double parentAbsoluteMaxCapacity =
resourceCalculationDriver.getParentAbsoluteMaxCapacity(label, resourceName);
double absoluteMaxCapacity = parentAbsoluteMaxCapacity
* context.getCurrentMaximumCapacityEntry(label).getResourceValue() / 100;
return resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label)
.getResourceValue(resourceName) * absoluteMaxCapacity;
}
@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
}
@Override
public void updateCapacitiesAfterCalculation(ResourceCalculationDriver resourceCalculationDriver,
CSQueue queue, String label) {
((AbstractCSQueue) queue).updateAbsoluteCapacities();
}
@Override
public ResourceUnitCapacityType getCapacityType() {
return ResourceUnitCapacityType.PERCENTAGE;
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import java.util.ArrayList;
import java.util.List;
/**
* A storage that encapsulates intermediate calculation values throughout a
* full queue capacity update phase.
*/
public class QueueCapacityUpdateContext {
private final Resource updatedClusterResource;
private final RMNodeLabelsManager labelsManager;
private final List<QueueUpdateWarning> warnings = new ArrayList<QueueUpdateWarning>();
public QueueCapacityUpdateContext(Resource updatedClusterResource,
RMNodeLabelsManager labelsManager) {
this.updatedClusterResource = updatedClusterResource;
this.labelsManager = labelsManager;
}
/**
* Returns the overall cluster resource available for the update phase.
*
* @param label node label
* @return cluster resource
*/
public Resource getUpdatedClusterResource(String label) {
return labelsManager.getResourceByLabel(label, updatedClusterResource);
}
/**
* Returns the overall cluster resource available for the update phase of empty label.
* @return cluster resource
*/
public Resource getUpdatedClusterResource() {
return updatedClusterResource;
}
/**
* Adds an update warning to the context.
* @param warning warning during update phase
*/
public void addUpdateWarning(QueueUpdateWarning warning) {
warnings.add(warning);
}
/**
* Returns all update warnings occurred in this update phase.
* @return update warnings
*/
public List<QueueUpdateWarning> getUpdateWarnings() {
return warnings;
}
}

View File

@ -39,9 +39,9 @@ public class QueueCapacityVector implements
private static final String VALUE_DELIMITER = "=";
private final ResourceVector resource;
private final Map<String, QueueCapacityType> capacityTypes
private final Map<String, ResourceUnitCapacityType> capacityTypes
= new HashMap<>();
private final Map<QueueCapacityType, Set<String>> capacityTypePerResource
private final Map<ResourceUnitCapacityType, Set<String>> capacityTypePerResource
= new HashMap<>();
public QueueCapacityVector() {
@ -61,9 +61,9 @@ public class QueueCapacityVector implements
public static QueueCapacityVector newInstance() {
QueueCapacityVector newCapacityVector =
new QueueCapacityVector(ResourceVector.newInstance());
for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
for (Map.Entry<String, Double> resourceEntry : newCapacityVector.resource) {
newCapacityVector.storeResourceType(resourceEntry.getKey(),
QueueCapacityType.ABSOLUTE);
ResourceUnitCapacityType.ABSOLUTE);
}
return newCapacityVector;
@ -78,10 +78,10 @@ public class QueueCapacityVector implements
* @return uniform capacity vector
*/
public static QueueCapacityVector of(
float value, QueueCapacityType capacityType) {
double value, ResourceUnitCapacityType capacityType) {
QueueCapacityVector newCapacityVector =
new QueueCapacityVector(ResourceVector.of(value));
for (Map.Entry<String, Float> resourceEntry : newCapacityVector.resource) {
for (Map.Entry<String, Double> resourceEntry : newCapacityVector.resource) {
newCapacityVector.storeResourceType(resourceEntry.getKey(), capacityType);
}
@ -109,8 +109,8 @@ public class QueueCapacityVector implements
* @param value value of the resource
* @param capacityType type of the resource
*/
public void setResource(String resourceName, float value,
QueueCapacityType capacityType) {
public void setResource(String resourceName, double value,
ResourceUnitCapacityType capacityType) {
// Necessary due to backward compatibility (memory = memory-mb)
String convertedResourceName = resourceName;
if (resourceName.equals("memory")) {
@ -125,10 +125,14 @@ public class QueueCapacityVector implements
*
* @return value of memory resource
*/
public float getMemory() {
public double getMemory() {
return resource.getValue(ResourceInformation.MEMORY_URI);
}
public boolean isEmpty() {
return resource.isEmpty() && capacityTypePerResource.isEmpty() && capacityTypes.isEmpty();
}
/**
* Returns the name of all resources that are defined in the given capacity
* type.
@ -137,13 +141,20 @@ public class QueueCapacityVector implements
* @return all resource names for the given capacity type
*/
public Set<String> getResourceNamesByCapacityType(
QueueCapacityType capacityType) {
return capacityTypePerResource.getOrDefault(capacityType,
Collections.emptySet());
ResourceUnitCapacityType capacityType) {
return new HashSet<>(capacityTypePerResource.getOrDefault(capacityType,
Collections.emptySet()));
}
/**
* Checks whether a resource unit is defined as a specific type.
*
* @param resourceName resource unit name
* @param capacityType capacity type
* @return true, if resource unit is defined as the given type
*/
public boolean isResourceOfType(
String resourceName, QueueCapacityType capacityType) {
String resourceName, ResourceUnitCapacityType capacityType) {
return capacityTypes.containsKey(resourceName) &&
capacityTypes.get(resourceName).equals(capacityType);
}
@ -151,7 +162,7 @@ public class QueueCapacityVector implements
@Override
public Iterator<QueueCapacityVectorEntry> iterator() {
return new Iterator<QueueCapacityVectorEntry>() {
private final Iterator<Map.Entry<String, Float>> resources =
private final Iterator<Map.Entry<String, Double>> resources =
resource.iterator();
private int i = 0;
@ -162,7 +173,7 @@ public class QueueCapacityVector implements
@Override
public QueueCapacityVectorEntry next() {
Map.Entry<String, Float> resourceInformation = resources.next();
Map.Entry<String, Double> resourceInformation = resources.next();
i++;
return new QueueCapacityVectorEntry(
capacityTypes.get(resourceInformation.getKey()),
@ -172,16 +183,29 @@ public class QueueCapacityVector implements
}
/**
* Returns a set of all capacity type defined for this vector.
* Returns a set of all capacity types defined for this vector.
*
* @return capacity types
*/
public Set<QueueCapacityType> getDefinedCapacityTypes() {
public Set<ResourceUnitCapacityType> getDefinedCapacityTypes() {
return capacityTypePerResource.keySet();
}
/**
* Checks whether the vector is a mixed capacity vector (more than one capacity type is used,
* therefore it is not uniform).
* @return true, if the vector is mixed
*/
public boolean isMixedCapacityVector() {
return getDefinedCapacityTypes().size() > 1;
}
public Set<String> getResourceNames() {
return resource.getResourceNames();
}
private void storeResourceType(
String resourceName, QueueCapacityType resourceType) {
String resourceName, ResourceUnitCapacityType resourceType) {
if (capacityTypes.get(resourceName) != null
&& !capacityTypes.get(resourceName).equals(resourceType)) {
capacityTypePerResource.get(capacityTypes.get(resourceName))
@ -199,7 +223,7 @@ public class QueueCapacityVector implements
stringVector.append(START_PARENTHESES);
int resourceCount = 0;
for (Map.Entry<String, Float> resourceEntry : resource) {
for (Map.Entry<String, Double> resourceEntry : resource) {
resourceCount++;
stringVector.append(resourceEntry.getKey())
.append(VALUE_DELIMITER)
@ -218,11 +242,11 @@ public class QueueCapacityVector implements
/**
* Represents a capacity type associated with its syntax postfix.
*/
public enum QueueCapacityType {
public enum ResourceUnitCapacityType {
PERCENTAGE("%"), ABSOLUTE(""), WEIGHT("w");
private final String postfix;
QueueCapacityType(String postfix) {
ResourceUnitCapacityType(String postfix) {
this.postfix = postfix;
}
@ -232,22 +256,22 @@ public class QueueCapacityVector implements
}
public static class QueueCapacityVectorEntry {
private final QueueCapacityType vectorResourceType;
private final float resourceValue;
private final ResourceUnitCapacityType vectorResourceType;
private final double resourceValue;
private final String resourceName;
public QueueCapacityVectorEntry(QueueCapacityType vectorResourceType,
String resourceName, float resourceValue) {
public QueueCapacityVectorEntry(ResourceUnitCapacityType vectorResourceType,
String resourceName, double resourceValue) {
this.vectorResourceType = vectorResourceType;
this.resourceValue = resourceValue;
this.resourceName = resourceName;
}
public QueueCapacityType getVectorResourceType() {
public ResourceUnitCapacityType getVectorResourceType() {
return vectorResourceType;
}
public float getResourceValue() {
public double getResourceValue() {
return resourceValue;
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
/**
* Represents an approach on how to convert a calculated resource from floating point to a whole
* number.
*/
public interface QueueResourceRoundingStrategy {
/**
* Returns a whole number converted from the calculated resource value.
* @param resourceValue calculated resource value
* @param capacityVectorEntry configured capacity entry
* @return rounded resource value
*/
double getRoundedResource(double resourceValue, QueueCapacityVectorEntry capacityVectorEntry);
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
/**
* Represents a warning event that occurred during a queue capacity update phase.
*/
public class QueueUpdateWarning {
private final String queue;
private final QueueUpdateWarningType warningType;
private String info = "";
public QueueUpdateWarning(QueueUpdateWarningType queueUpdateWarningType, String queue) {
this.warningType = queueUpdateWarningType;
this.queue = queue;
}
public enum QueueUpdateWarningType {
BRANCH_UNDERUTILIZED("Remaining resource found in branch under parent queue '%s'. %s"),
QUEUE_OVERUTILIZED("Queue '%s' is configured to use more resources than what is available " +
"under its parent. %s"),
QUEUE_ZERO_RESOURCE("Queue '%s' is assigned zero resource. %s"),
BRANCH_DOWNSCALED("Child queues with absolute configured capacity under parent queue '%s' are" +
" downscaled due to insufficient cluster resource. %s"),
QUEUE_EXCEEDS_MAX_RESOURCE("Queue '%s' exceeds its maximum available resources. %s"),
QUEUE_MAX_RESOURCE_EXCEEDS_PARENT("Maximum resources of queue '%s' are greater than its " +
"parent's. %s");
private final String template;
QueueUpdateWarningType(String template) {
this.template = template;
}
public QueueUpdateWarning ofQueue(String queue) {
return new QueueUpdateWarning(this, queue);
}
public String getTemplate() {
return template;
}
}
public QueueUpdateWarning withInfo(String info) {
this.info = info;
return this;
}
public String getQueue() {
return queue;
}
public QueueUpdateWarningType getWarningType() {
return warningType;
}
@Override
public String toString() {
return String.format(warningType.getTemplate(), queue, info);
}
}

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
/**
* Drives the main logic of resource calculation for all children under a queue. Acts as a
* bookkeeper of disposable update information that is used by all children under the common parent.
*/
public class ResourceCalculationDriver {
private static final ResourceUnitCapacityType[] CALCULATOR_PRECEDENCE =
new ResourceUnitCapacityType[] {
ResourceUnitCapacityType.ABSOLUTE,
ResourceUnitCapacityType.PERCENTAGE,
ResourceUnitCapacityType.WEIGHT};
static final String MB_UNIT = "Mi";
protected final QueueResourceRoundingStrategy roundingStrategy =
new DefaultQueueResourceRoundingStrategy(CALCULATOR_PRECEDENCE);
protected final CSQueue queue;
protected final QueueCapacityUpdateContext updateContext;
protected final Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> calculators;
protected final Collection<String> definedResources;
protected final Map<String, ResourceVector> overallRemainingResourcePerLabel = new HashMap<>();
protected final Map<String, ResourceVector> batchRemainingResourcePerLabel = new HashMap<>();
// Used by ABSOLUTE capacity types
protected final Map<String, ResourceVector> normalizedResourceRatioPerLabel = new HashMap<>();
// Used by WEIGHT capacity types
protected final Map<String, Map<String, Double>> sumWeightsPerLabel = new HashMap<>();
protected Map<String, Double> usedResourceByCurrentCalculatorPerLabel = new HashMap<>();
public ResourceCalculationDriver(
CSQueue queue, QueueCapacityUpdateContext updateContext,
Map<ResourceUnitCapacityType, AbstractQueueCapacityCalculator> calculators,
Collection<String> definedResources) {
this.queue = queue;
this.updateContext = updateContext;
this.calculators = calculators;
this.definedResources = definedResources;
}
/**
* Returns the parent that is driving the calculation.
*
* @return a common parent queue
*/
public CSQueue getQueue() {
return queue;
}
/**
* Returns all the children defined under the driver parent queue.
*
* @return child queues
*/
public Collection<CSQueue> getChildQueues() {
return queue.getChildQueues();
}
/**
* Returns the context that is used throughout the whole update phase.
*
* @return update context
*/
public QueueCapacityUpdateContext getUpdateContext() {
return updateContext;
}
/**
* Increments the aggregated weight.
*
* @param label node label
* @param resourceName resource unit name
* @param value weight value
*/
public void incrementWeight(String label, String resourceName, double value) {
sumWeightsPerLabel.putIfAbsent(label, new HashMap<>());
sumWeightsPerLabel.get(label).put(resourceName,
sumWeightsPerLabel.get(label).getOrDefault(resourceName, 0d) + value);
}
/**
* Returns the aggregated children weights.
*
* @param label node label
* @param resourceName resource unit name
* @return aggregated weights of children
*/
public double getSumWeightsByResource(String label, String resourceName) {
return sumWeightsPerLabel.get(label).get(resourceName);
}
/**
* Returns the ratio of the summary of children absolute configured resources and the parent's
* effective minimum resource.
*
* @return normalized resource ratio for all labels
*/
public Map<String, ResourceVector> getNormalizedResourceRatios() {
return normalizedResourceRatioPerLabel;
}
/**
* Returns the remaining resource ratio under the parent queue. The remaining resource is only
* decremented after a capacity type is fully evaluated.
*
* @param label node label
* @param resourceName name of resource unit
* @return resource ratio
*/
public double getRemainingRatioOfResource(String label, String resourceName) {
return batchRemainingResourcePerLabel.get(label).getValue(resourceName)
/ queue.getEffectiveCapacity(label).getResourceValue(resourceName);
}
/**
* Returns the ratio of the parent queue's effective minimum resource relative to the full cluster
* resource.
*
* @param label node label
* @param resourceName name of resource unit
* @return absolute minimum capacity
*/
public double getParentAbsoluteMinCapacity(String label, String resourceName) {
return (double) queue.getEffectiveCapacity(label).getResourceValue(resourceName)
/ getUpdateContext().getUpdatedClusterResource(label).getResourceValue(resourceName);
}
/**
* Returns the ratio of the parent queue's effective maximum resource relative to the full cluster
* resource.
*
* @param label node label
* @param resourceName name of resource unit
* @return absolute maximum capacity
*/
public double getParentAbsoluteMaxCapacity(String label, String resourceName) {
return (double) queue.getEffectiveMaxCapacity(label).getResourceValue(resourceName)
/ getUpdateContext().getUpdatedClusterResource(label).getResourceValue(resourceName);
}
/**
* Returns the remaining resources of a parent that is still available for its
* children. Decremented only after the calculator is finished its work on the corresponding
* resources.
*
* @param label node label
* @return remaining resources
*/
public ResourceVector getBatchRemainingResource(String label) {
batchRemainingResourcePerLabel.putIfAbsent(label, ResourceVector.newInstance());
return batchRemainingResourcePerLabel.get(label);
}
/**
* Calculates and sets the minimum and maximum effective resources for all children under the
* parent queue with which this driver was initialized.
*/
public void calculateResources() {
// Reset both remaining resource storage to the parent's available resource
for (String label : queue.getConfiguredNodeLabels()) {
overallRemainingResourcePerLabel.put(label,
ResourceVector.of(queue.getEffectiveCapacity(label)));
batchRemainingResourcePerLabel.put(label,
ResourceVector.of(queue.getEffectiveCapacity(label)));
}
for (AbstractQueueCapacityCalculator capacityCalculator : calculators.values()) {
capacityCalculator.calculateResourcePrerequisites(this);
}
for (String resourceName : definedResources) {
for (ResourceUnitCapacityType capacityType : CALCULATOR_PRECEDENCE) {
for (CSQueue childQueue : getChildQueues()) {
CalculationContext context = new CalculationContext(resourceName, capacityType,
childQueue);
calculateResourceOnChild(context);
}
// Flush aggregated used resource by labels at the end of a calculator phase
for (Map.Entry<String, Double> entry : usedResourceByCurrentCalculatorPerLabel.entrySet()) {
batchRemainingResourcePerLabel.get(entry.getKey()).decrement(resourceName,
entry.getValue());
}
usedResourceByCurrentCalculatorPerLabel = new HashMap<>();
}
}
validateRemainingResource();
}
private void calculateResourceOnChild(CalculationContext context) {
context.getQueue().getWriteLock().lock();
try {
for (String label : context.getQueue().getConfiguredNodeLabels()) {
if (!context.getQueue().getConfiguredCapacityVector(label).isResourceOfType(
context.getResourceName(), context.getCapacityType())) {
continue;
}
double usedResourceByChild = setChildResources(context, label);
double aggregatedUsedResource = usedResourceByCurrentCalculatorPerLabel.getOrDefault(label,
0d);
double resourceUsedByLabel = aggregatedUsedResource + usedResourceByChild;
overallRemainingResourcePerLabel.get(label).decrement(context.getResourceName(),
usedResourceByChild);
usedResourceByCurrentCalculatorPerLabel.put(label, resourceUsedByLabel);
}
} finally {
context.getQueue().getWriteLock().unlock();
}
}
private double setChildResources(CalculationContext context, String label) {
QueueCapacityVectorEntry capacityVectorEntry = context.getQueue().getConfiguredCapacityVector(
label).getResource(context.getResourceName());
QueueCapacityVectorEntry maximumCapacityVectorEntry = context.getQueue()
.getConfiguredMaxCapacityVector(label).getResource(context.getResourceName());
AbstractQueueCapacityCalculator maximumCapacityCalculator = calculators.get(
maximumCapacityVectorEntry.getVectorResourceType());
double minimumResource =
calculators.get(context.getCapacityType()).calculateMinimumResource(this, context, label);
double maximumResource = maximumCapacityCalculator.calculateMaximumResource(this, context,
label);
minimumResource = roundingStrategy.getRoundedResource(minimumResource, capacityVectorEntry);
maximumResource = roundingStrategy.getRoundedResource(maximumResource,
maximumCapacityVectorEntry);
Pair<Double, Double> resources = validateCalculatedResources(context, label,
new ImmutablePair<>(
minimumResource, maximumResource));
minimumResource = resources.getLeft();
maximumResource = resources.getRight();
context.getQueue().getQueueResourceQuotas().getEffectiveMinResource(label).setResourceValue(
context.getResourceName(), (long) minimumResource);
context.getQueue().getQueueResourceQuotas().getEffectiveMaxResource(label).setResourceValue(
context.getResourceName(), (long) maximumResource);
return minimumResource;
}
private Pair<Double, Double> validateCalculatedResources(CalculationContext context,
String label, Pair<Double, Double> calculatedResources) {
double minimumResource = calculatedResources.getLeft();
long minimumMemoryResource =
context.getQueue().getQueueResourceQuotas().getEffectiveMinResource(label).getMemorySize();
double remainingResourceUnderParent = overallRemainingResourcePerLabel.get(label).getValue(
context.getResourceName());
long parentMaximumResource = queue.getEffectiveMaxCapacity(label).getResourceValue(
context.getResourceName());
double maximumResource = calculatedResources.getRight();
// Memory is the primary resource, if its zero, all other resource units are zero as well.
if (!context.getResourceName().equals(MEMORY_URI) && minimumMemoryResource == 0) {
minimumResource = 0;
}
if (maximumResource != 0 && maximumResource > parentMaximumResource) {
updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT
.ofQueue(context.getQueue().getQueuePath()));
}
maximumResource = maximumResource == 0 ? parentMaximumResource : Math.min(maximumResource,
parentMaximumResource);
if (maximumResource < minimumResource) {
updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE.ofQueue(
context.getQueue().getQueuePath()));
minimumResource = maximumResource;
}
if (minimumResource > remainingResourceUnderParent) {
// Legacy auto queues are assigned a zero resource if not enough resource is left
if (queue instanceof ManagedParentQueue) {
minimumResource = 0;
} else {
updateContext.addUpdateWarning(
QueueUpdateWarningType.QUEUE_OVERUTILIZED.ofQueue(
context.getQueue().getQueuePath()).withInfo(
"Resource name: " + context.getResourceName() +
" resource value: " + minimumResource));
minimumResource = remainingResourceUnderParent;
}
}
if (minimumResource == 0) {
updateContext.addUpdateWarning(QueueUpdateWarningType.QUEUE_ZERO_RESOURCE.ofQueue(
context.getQueue().getQueuePath())
.withInfo("Resource name: " + context.getResourceName()));
}
return new ImmutablePair<>(minimumResource, maximumResource);
}
private void validateRemainingResource() {
for (String label : queue.getConfiguredNodeLabels()) {
if (!batchRemainingResourcePerLabel.get(label).equals(ResourceVector.newInstance())) {
updateContext.addUpdateWarning(QueueUpdateWarningType.BRANCH_UNDERUTILIZED.ofQueue(
queue.getQueuePath()).withInfo("Label: " + label));
}
}
}
}

View File

@ -25,13 +25,13 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* Represents a simple resource floating point value storage
* grouped by resource names.
* Represents a simple resource floating point value grouped by resource names.
*/
public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
private final Map<String, Float> resourcesByName = new HashMap<>();
public class ResourceVector implements Iterable<Map.Entry<String, Double>> {
private final Map<String, Double> resourcesByName = new HashMap<>();
/**
* Creates a new {@code ResourceVector} with all pre-defined resources set to
@ -53,7 +53,7 @@ public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
* @param value the value to set all resources to
* @return uniform resource vector
*/
public static ResourceVector of(float value) {
public static ResourceVector of(double value) {
ResourceVector emptyResourceVector = new ResourceVector();
for (ResourceInformation resource : ResourceUtils.getResourceTypesArray()) {
emptyResourceVector.setValue(resource.getName(), value);
@ -79,34 +79,51 @@ public class ResourceVector implements Iterable<Map.Entry<String, Float>> {
}
/**
* Subtract values for each resource defined in the given resource vector.
* Decrements values for each resource defined in the given resource vector.
* @param otherResourceVector rhs resource vector of the subtraction
*/
public void subtract(ResourceVector otherResourceVector) {
for (Map.Entry<String, Float> resource : otherResourceVector) {
public void decrement(ResourceVector otherResourceVector) {
for (Map.Entry<String, Double> resource : otherResourceVector) {
setValue(resource.getKey(), getValue(resource.getKey()) - resource.getValue());
}
}
/**
* Decrements the given resource by the specified value.
* @param resourceName name of the resource
* @param value value to be subtracted from the resource's current value
*/
public void decrement(String resourceName, double value) {
setValue(resourceName, getValue(resourceName) - value);
}
/**
* Increments the given resource by the specified value.
* @param resourceName name of the resource
* @param value value to be added to the resource's current value
*/
public void increment(String resourceName, float value) {
public void increment(String resourceName, double value) {
setValue(resourceName, getValue(resourceName) + value);
}
public Float getValue(String resourceName) {
public double getValue(String resourceName) {
return resourcesByName.get(resourceName);
}
public void setValue(String resourceName, float value) {
public void setValue(String resourceName, double value) {
resourcesByName.put(resourceName, value);
}
public boolean isEmpty() {
return resourcesByName.isEmpty();
}
public Set<String> getResourceNames() {
return resourcesByName.keySet();
}
@Override
public Iterator<Map.Entry<String, Float>> iterator() {
public Iterator<Map.Entry<String, Double>> iterator() {
return resourcesByName.entrySet().iterator();
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Collection;
import java.util.Collections;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;
/**
* A special case that contains the resource calculation of the root queue.
*/
public final class RootCalculationDriver extends ResourceCalculationDriver {
private final AbstractQueueCapacityCalculator rootCalculator;
public RootCalculationDriver(CSQueue rootQueue, QueueCapacityUpdateContext updateContext,
AbstractQueueCapacityCalculator rootCalculator,
Collection<String> definedResources) {
super(rootQueue, updateContext, Collections.emptyMap(), definedResources);
this.rootCalculator = rootCalculator;
}
@Override
public void calculateResources() {
for (String label : queue.getConfiguredNodeLabels()) {
for (QueueCapacityVector.QueueCapacityVectorEntry capacityVectorEntry :
queue.getConfiguredCapacityVector(label)) {
String resourceName = capacityVectorEntry.getResourceName();
CalculationContext context = new CalculationContext(resourceName, PERCENTAGE, queue);
double minimumResource = rootCalculator.calculateMinimumResource(this, context, label);
double maximumResource = rootCalculator.calculateMaximumResource(this, context, label);
long roundedMinResource = (long) roundingStrategy
.getRoundedResource(minimumResource, capacityVectorEntry);
long roundedMaxResource = (long) roundingStrategy
.getRoundedResource(maximumResource,
queue.getConfiguredMaxCapacityVector(label).getResource(resourceName));
queue.getQueueResourceQuotas().getEffectiveMinResource(label).setResourceValue(
resourceName, roundedMinResource);
queue.getQueueResourceQuotas().getEffectiveMaxResource(label).setResourceValue(
resourceName, roundedMaxResource);
}
rootCalculator.updateCapacitiesAfterCalculation(this, queue, label);
}
rootCalculator.calculateResourcePrerequisites(this);
}
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.PERCENTAGE;
public class RootQueueCapacityCalculator extends AbstractQueueCapacityCalculator {
@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
AbsoluteResourceCapacityCalculator.setNormalizedResourceRatio(resourceCalculationDriver);
}
@Override
public double calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context, String label) {
return resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label)
.getResourceValue(context.getResourceName());
}
@Override
public double calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context, String label) {
return resourceCalculationDriver.getUpdateContext().getUpdatedClusterResource(label)
.getResourceValue(context.getResourceName());
}
@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
queue.getQueueCapacities().setAbsoluteCapacity(label, 1);
if (queue.getQueueCapacities().getWeight(label) == 1) {
queue.getQueueCapacities().setNormalizedWeight(label, 1);
}
}
@Override
public ResourceUnitCapacityType getCapacityType() {
return PERCENTAGE;
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import java.util.Collection;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType.WEIGHT;
public class WeightQueueCapacityCalculator extends AbstractQueueCapacityCalculator {
@Override
public void calculateResourcePrerequisites(ResourceCalculationDriver resourceCalculationDriver) {
// Precalculate the summary of children's weight
for (CSQueue childQueue : resourceCalculationDriver.getChildQueues()) {
for (String label : childQueue.getConfiguredNodeLabels()) {
for (String resourceName : childQueue.getConfiguredCapacityVector(label)
.getResourceNamesByCapacityType(getCapacityType())) {
resourceCalculationDriver.incrementWeight(label, resourceName, childQueue
.getConfiguredCapacityVector(label).getResource(resourceName).getResourceValue());
}
}
}
}
@Override
public double calculateMinimumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label) {
String resourceName = context.getResourceName();
double normalizedWeight = context.getCurrentMinimumCapacityEntry(label).getResourceValue() /
resourceCalculationDriver.getSumWeightsByResource(label, resourceName);
double remainingResource = resourceCalculationDriver.getBatchRemainingResource(label)
.getValue(resourceName);
// Due to rounding loss it is better to use all remaining resources if no other resource uses
// weight
if (normalizedWeight == 1) {
return remainingResource;
}
double remainingResourceRatio = resourceCalculationDriver.getRemainingRatioOfResource(
label, resourceName);
double parentAbsoluteCapacity = resourceCalculationDriver.getParentAbsoluteMinCapacity(
label, resourceName);
double queueAbsoluteCapacity = parentAbsoluteCapacity * remainingResourceRatio
* normalizedWeight;
return resourceCalculationDriver.getUpdateContext()
.getUpdatedClusterResource(label).getResourceValue(resourceName) * queueAbsoluteCapacity;
}
@Override
public double calculateMaximumResource(ResourceCalculationDriver resourceCalculationDriver,
CalculationContext context,
String label) {
throw new IllegalStateException("Resource " + context.getCurrentMinimumCapacityEntry(
label).getResourceName() +
" has " + "WEIGHT maximum capacity type, which is not supported");
}
@Override
public ResourceUnitCapacityType getCapacityType() {
return WEIGHT;
}
@Override
public void updateCapacitiesAfterCalculation(
ResourceCalculationDriver resourceCalculationDriver, CSQueue queue, String label) {
double sumCapacityPerResource = 0f;
Collection<String> resourceNames = getResourceNames(queue, label);
for (String resourceName : resourceNames) {
double sumBranchWeight = resourceCalculationDriver.getSumWeightsByResource(label,
resourceName);
double capacity = queue.getConfiguredCapacityVector(
label).getResource(resourceName).getResourceValue() / sumBranchWeight;
sumCapacityPerResource += capacity;
}
queue.getQueueCapacities().setNormalizedWeight(label,
(float) (sumCapacityPerResource / resourceNames.size()));
((AbstractCSQueue) queue).updateAbsoluteCapacities();
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import java.util.ArrayList;
@ -61,22 +61,16 @@ public class QueueCapacityConfigParser {
/**
* Creates a {@code QueueCapacityVector} parsed from the capacity configuration
* property set for a queue.
* @param conf configuration object
* @param capacityString capacity string to parse
* @param queuePath queue for which the capacity property is parsed
* @param label node label
* @return a parsed capacity vector
*/
public QueueCapacityVector parse(CapacitySchedulerConfiguration conf,
String queuePath, String label) {
public QueueCapacityVector parse(String capacityString, String queuePath) {
if (queuePath.equals(CapacitySchedulerConfiguration.ROOT)) {
return QueueCapacityVector.of(100f, QueueCapacityType.PERCENTAGE);
return QueueCapacityVector.of(100f, ResourceUnitCapacityType.PERCENTAGE);
}
String propertyName = CapacitySchedulerConfiguration.getNodeLabelPrefix(
queuePath, label) + CapacitySchedulerConfiguration.CAPACITY;
String capacityString = conf.get(propertyName);
if (capacityString == null) {
return new QueueCapacityVector();
}
@ -101,13 +95,13 @@ public class QueueCapacityConfigParser {
* @return a parsed capacity vector
*/
private QueueCapacityVector uniformParser(Matcher matcher) {
QueueCapacityType capacityType = null;
ResourceUnitCapacityType capacityType = null;
String value = matcher.group(1);
if (matcher.groupCount() == 2) {
String matchedSuffix = matcher.group(2);
for (QueueCapacityType suffix : QueueCapacityType.values()) {
for (ResourceUnitCapacityType suffix : ResourceUnitCapacityType.values()) {
// Absolute uniform syntax is not supported
if (suffix.equals(QueueCapacityType.ABSOLUTE)) {
if (suffix.equals(ResourceUnitCapacityType.ABSOLUTE)) {
continue;
}
// when capacity is given in percentage, we do not need % symbol
@ -164,7 +158,7 @@ public class QueueCapacityConfigParser {
private void setCapacityVector(
QueueCapacityVector resource, String resourceName, String resourceValue) {
QueueCapacityType capacityType = QueueCapacityType.ABSOLUTE;
ResourceUnitCapacityType capacityType = ResourceUnitCapacityType.ABSOLUTE;
// Extract suffix from a value e.g. for 6w extract w
String suffix = resourceValue.replaceAll(FLOAT_DIGIT_REGEX, "");
@ -180,7 +174,7 @@ public class QueueCapacityConfigParser {
// Convert all incoming units to MB if units is configured.
convertedValue = UnitsConversionUtil.convert(suffix, "Mi", (long) parsedResourceValue);
} else {
for (QueueCapacityType capacityTypeSuffix : QueueCapacityType.values()) {
for (ResourceUnitCapacityType capacityTypeSuffix : ResourceUnitCapacityType.values()) {
if (capacityTypeSuffix.getPostfix().equals(suffix)) {
capacityType = capacityTypeSuffix;
}
@ -198,8 +192,12 @@ public class QueueCapacityConfigParser {
* false otherwise
*/
public boolean isCapacityVectorFormat(String configuredCapacity) {
return configuredCapacity != null
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
if (configuredCapacity == null) {
return false;
}
String formattedCapacityString = configuredCapacity.replaceAll(" ", "");
return RESOURCE_PATTERN.matcher(formattedCapacityString).find();
}
private static class Parser {

View File

@ -27,9 +27,11 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.nodelabels.NodeLabelsStore;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
Map<NodeId, Set<String>> lastNodeToLabels = null;
@ -98,4 +100,24 @@ public class NullRMNodeLabelsManager extends RMNodeLabelsManager {
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
super.serviceInit(conf);
}
public void setResourceForLabel(String label, Resource resource) {
if (label.equals(NO_LABEL)) {
noNodeLabel = new FakeLabel(resource);
return;
}
labelCollections.put(label, new FakeLabel(label, resource));
}
private static class FakeLabel extends RMNodeLabel {
FakeLabel(String label, Resource resource) {
super(label, resource, 1, false);
}
FakeLabel(Resource resource) {
super(NO_LABEL, resource, 1, false);
}
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Before;
import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.GB;
public class CapacitySchedulerQueueCalculationTestBase {
protected static final String A = "root.a";
protected static final String A1 = "root.a.a1";
protected static final String A11 = "root.a.a1.a11";
protected static final String A12 = "root.a.a1.a12";
protected static final String A2 = "root.a.a2";
protected static final String B = "root.b";
protected static final String B1 = "root.b.b1";
protected static final String C = "root.c";
private static final String CAPACITY_VECTOR_TEMPLATE = "[memory=%s, vcores=%s]";
protected ResourceCalculator resourceCalculator;
protected MockRM mockRM;
protected CapacityScheduler cs;
protected CapacitySchedulerConfiguration csConf;
protected NullRMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
csConf = new CapacitySchedulerConfiguration();
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
csConf.setQueues("root", new String[]{"a", "b"});
csConf.setCapacity("root.a", 50f);
csConf.setCapacity("root.b", 50f);
csConf.setQueues("root.a", new String[]{"a1", "a2"});
csConf.setCapacity("root.a.a1", 100f);
csConf.setQueues("root.a.a1", new String[]{"a11", "a12"});
csConf.setCapacity("root.a.a1.a11", 50f);
csConf.setCapacity("root.a.a1.a12", 50f);
mgr = new NullRMNodeLabelsManager();
mgr.init(csConf);
mockRM = new MockRM(csConf) {
protected RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
cs = (CapacityScheduler) mockRM.getResourceScheduler();
cs.updatePlacementRules();
// Policy for new auto created queue's auto deletion when expired
mockRM.start();
cs.start();
mockRM.registerNode("h1:1234", 10 * GB); // label = x
resourceCalculator = cs.getResourceCalculator();
}
protected QueueCapacityUpdateContext update(
QueueAssertionBuilder assertions, Resource clusterResource)
throws IOException {
return update(assertions, clusterResource, clusterResource);
}
protected QueueCapacityUpdateContext update(
QueueAssertionBuilder assertions, Resource clusterResource, Resource emptyLabelResource)
throws IOException {
cs.reinitialize(csConf, mockRM.getRMContext());
CapacitySchedulerQueueCapacityHandler queueController =
new CapacitySchedulerQueueCapacityHandler(mgr);
mgr.setResourceForLabel(CommonNodeLabelsManager.NO_LABEL, emptyLabelResource);
queueController.updateRoot(cs.getQueue("root"), clusterResource);
QueueCapacityUpdateContext updateContext =
queueController.updateChildren(clusterResource, cs.getQueue("root"));
assertions.finishAssertion();
return updateContext;
}
protected QueueAssertionBuilder createAssertionBuilder() {
return new QueueAssertionBuilder(cs);
}
protected static String createCapacityVector(Object memory, Object vcores) {
return String.format(CAPACITY_VECTOR_TEMPLATE, memory, vcores);
}
protected static String absolute(double value) {
return String.valueOf((long) value);
}
protected static String weight(float value) {
return value + "w";
}
protected static String percentage(float value) {
return value + "%";
}
protected static Resource createResource(double memory, double vcores) {
return Resource.newInstance((int) memory, (int) vcores);
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.junit.Assert;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
/**
* Provides a fluent API to assert resource and capacity attributes of queues.
*/
class QueueAssertionBuilder {
private static final String EFFECTIVE_MAX_RES_INFO = "Effective Maximum Resource";
private static final BiFunction<QueueResourceQuotas, String, Resource> EFFECTIVE_MAX_RES =
QueueResourceQuotas::getEffectiveMaxResource;
private static final String EFFECTIVE_MIN_RES_INFO = "Effective Minimum Resource";
private static final BiFunction<QueueResourceQuotas, String, Resource> EFFECTIVE_MIN_RES =
QueueResourceQuotas::getEffectiveMinResource;
private static final String CAPACITY_INFO = "Capacity";
private static final BiFunction<QueueCapacities, String, Float> CAPACITY =
QueueCapacities::getCapacity;
private static final String ABS_CAPACITY_INFO = "Absolute Capacity";
private static final BiFunction<QueueCapacities, String, Float> ABS_CAPACITY =
QueueCapacities::getAbsoluteCapacity;
private static final String ASSERTION_ERROR_MESSAGE =
"'%s' of queue '%s' does not match %f for label %s";
private static final String RESOURCE_ASSERTION_ERROR_MESSAGE =
"'%s' of queue '%s' does not match %s for label %s";
private final CapacityScheduler cs;
QueueAssertionBuilder(CapacityScheduler cs) {
this.cs = cs;
}
public class QueueAssertion {
private final String queuePath;
private final List<QueueAssertion.ValueAssertion> assertions = new ArrayList<>();
QueueAssertion(String queuePath) {
this.queuePath = queuePath;
}
public QueueAssertion withQueue(String queuePath) {
return QueueAssertionBuilder.this.withQueue(queuePath);
}
public QueueAssertionBuilder build() {
return QueueAssertionBuilder.this.build();
}
public QueueAssertion assertEffectiveMaxResource(Resource expected) {
ValueAssertion valueAssertion = new ValueAssertion(expected);
valueAssertion.withResourceSupplier(EFFECTIVE_MAX_RES, EFFECTIVE_MAX_RES_INFO);
assertions.add(valueAssertion);
return this;
}
public QueueAssertion assertEffectiveMinResource(Resource expected, String label) {
ValueAssertion valueAssertion = new ValueAssertion(expected);
valueAssertion.withResourceSupplier(EFFECTIVE_MIN_RES, EFFECTIVE_MIN_RES_INFO);
assertions.add(valueAssertion);
valueAssertion.label = label;
return this;
}
public QueueAssertion assertEffectiveMinResource(Resource expected) {
return assertEffectiveMinResource(expected, NO_LABEL);
}
public QueueAssertion assertCapacity(double expected) {
ValueAssertion valueAssertion = new ValueAssertion(expected);
valueAssertion.withCapacitySupplier(CAPACITY, CAPACITY_INFO);
assertions.add(valueAssertion);
return this;
}
public QueueAssertion assertAbsoluteCapacity(double expected) {
ValueAssertion valueAssertion = new ValueAssertion(expected);
valueAssertion.withCapacitySupplier(ABS_CAPACITY, ABS_CAPACITY_INFO);
assertions.add(valueAssertion);
return this;
}
private class ValueAssertion {
private double expectedValue = 0;
private Resource expectedResource = null;
private String assertionType;
private Supplier<Float> valueSupplier;
private Supplier<Resource> resourceSupplier;
private String label = "";
ValueAssertion(double expectedValue) {
this.expectedValue = expectedValue;
}
ValueAssertion(Resource expectedResource) {
this.expectedResource = expectedResource;
}
public void setLabel(String label) {
this.label = label;
}
public void withResourceSupplier(
BiFunction<QueueResourceQuotas, String, Resource> assertion, String messageInfo) {
CSQueue queue = cs.getQueue(queuePath);
if (queue == null) {
Assert.fail("Queue " + queuePath + " is not found");
}
assertionType = messageInfo;
resourceSupplier = () -> assertion.apply(queue.getQueueResourceQuotas(), label);
}
public void withCapacitySupplier(
BiFunction<QueueCapacities, String, Float> assertion, String messageInfo) {
CSQueue queue = cs.getQueue(queuePath);
if (queue == null) {
Assert.fail("Queue " + queuePath + " is not found");
}
assertionType = messageInfo;
valueSupplier = () -> assertion.apply(queue.getQueueCapacities(), label);
}
}
}
private final Map<String, QueueAssertion> assertions = new LinkedHashMap<>();
public QueueAssertionBuilder build() {
return this;
}
/**
* Creates a new assertion group for a specific queue.
* @param queuePath path of the queue
* @return queue assertion group
*/
public QueueAssertion withQueue(String queuePath) {
assertions.putIfAbsent(queuePath, new QueueAssertion(queuePath));
return assertions.get(queuePath);
}
/**
* Executes assertions created for all queues.
*/
public void finishAssertion() {
for (Map.Entry<String, QueueAssertion> assertionEntry : assertions.entrySet()) {
for (QueueAssertion.ValueAssertion assertion : assertionEntry.getValue().assertions) {
if (assertion.resourceSupplier != null) {
String errorMessage = String.format(RESOURCE_ASSERTION_ERROR_MESSAGE,
assertion.assertionType, assertionEntry.getKey(),
assertion.expectedResource.toString(), assertion.label);
Assert.assertEquals(errorMessage, assertion.expectedResource,
assertion.resourceSupplier.get());
} else {
String errorMessage = String.format(ASSERTION_ERROR_MESSAGE,
assertion.assertionType, assertionEntry.getKey(), assertion.expectedValue,
assertion.label);
Assert.assertEquals(errorMessage, assertion.expectedValue,
assertion.valueSupplier.get(), EPSILON);
}
}
}
}
/**
* Returns all queues that have defined assertions.
* @return queue paths
*/
public Set<String> getQueues() {
return assertions.keySet();
}
}

View File

@ -0,0 +1,536 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueUpdateWarning.QueueUpdateWarningType;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.GB;
public class TestMixedQueueResourceCalculation extends CapacitySchedulerQueueCalculationTestBase {
private static final long MEMORY = 16384;
private static final long VCORES = 16;
private static final String C_VECTOR_WITH_WARNING = createCapacityVector(weight(3),
absolute(VCORES * 0.25));
private static final String A11_VECTOR_WITH_WARNING = createCapacityVector(weight(1),
absolute(VCORES * 0.25));
private static final String A1_VECTOR_WITH_WARNING = createCapacityVector(absolute(2048),
absolute(VCORES * 0.25));
private static final String C_VECTOR_NO_REMAINING_RESOURCE = createCapacityVector(weight(3),
absolute(VCORES * 0.25));
private static final String A1_VECTOR_NO_REMAINING_RESOURCE = createCapacityVector(weight(1),
absolute(VCORES * 0.25));
private static final Resource A12_EXPECTED_MAX_RESOURCE_MAX_WARNINGS =
createResource(MEMORY * 0.5, VCORES);
private static final Resource A11_EXPECTED_MAX_RESOURCE_MAX_WARNINGS =
createResource(MEMORY * 0.5, 0.1 * VCORES);
private static final Resource A11_EXPECTED_MIN_RESOURCE_MAX_WARNINGS =
createResource(0.5 * 0.5 * MEMORY, 0.1 * VCORES);
private static final Resource A12_EXPECTED_MIN_RESOURCE_MAX_WARNINGS =
createResource(0.5 * 0.5 * MEMORY, 0);
private static final String A11_MAX_VECTOR_MAX_WARNINGS =
createCapacityVector(absolute(MEMORY), percentage(10));
private static final String A1_MAX_VECTOR_MAX_WARNINGS =
createCapacityVector(absolute(MEMORY * 0.5),
percentage(100));
private static final Resource UPDATE_RESOURCE = Resource.newInstance(16384, 16);
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private static final Resource A_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(2486, 9);
private static final Resource A1_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(621, 4);
private static final Resource A11_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(217, 1);
private static final Resource A12_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(403, 3);
private static final Resource A2_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(1865, 5);
private static final Resource B_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(8095, 3);
private static final Resource B1_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(8095, 3);
private static final Resource C_COMPLEX_NO_REMAINING_RESOURCE = Resource.newInstance(5803, 4);
private static final Resource B_WARNING_RESOURCE = Resource.newInstance(8096, 4);
private static final Resource B1_WARNING_RESOURCE = Resource.newInstance(8096, 3);
private static final Resource A_WARNING_RESOURCE = Resource.newInstance(8288, 12);
private static final Resource A1_WARNING_RESOURCE = Resource.newInstance(2048, 4);
private static final Resource A2_WARNING_RESOURCE = Resource.newInstance(2048, 8);
private static final Resource A12_WARNING_RESOURCE = Resource.newInstance(2048, 4);
private static final String A_VECTOR_ZERO_RESOURCE =
createCapacityVector(percentage(100), weight(6));
private static final String B_VECTOR_ZERO_RESOURCE =
createCapacityVector(absolute(MEMORY), absolute(VCORES * 0.5));
private static final String A_MAX_VECTOR_DIFFERENT_MIN_MAX =
createCapacityVector(absolute(MEMORY), percentage(80));
private static final Resource B_EXPECTED_MAX_RESOURCE_DIFFERENT_MIN_MAX =
Resource.newInstance(MEMORY, (int) (VCORES * 0.5));
private static final Resource A_EXPECTED_MAX_RESOURCE_DIFFERENT_MIN_MAX =
Resource.newInstance(MEMORY, (int) (VCORES * 0.8));
private static final String B_MAX_VECTOR_DIFFERENT_MIN_MAX =
createCapacityVector(absolute(MEMORY), absolute(VCORES * 0.5));
private static final String A_MIN_VECTOR_DIFFERENT_MIN_MAX =
createCapacityVector(percentage(50), absolute(VCORES * 0.5));
private static final String B_MIN_VECTOR_DIFFERENT_MIN_MAX =
createCapacityVector(weight(6), percentage(100));
private static final String B_INVALID_MAX_VECTOR =
createCapacityVector(absolute(MEMORY), weight(10));
private static final String X_LABEL = "x";
private static final String Y_LABEL = "y";
private static final String Z_LABEL = "z";
private static final String H1_NODE = "h1";
private static final String H2_NODE = "h2";
private static final String H3_NODE = "h3";
private static final String H4_NODE = "h4";
private static final String H5_NODE = "h5";
private static final int H1_MEMORY = 60 * GB;
private static final int H1_VCORES = 60;
private static final int H2_MEMORY = 10 * GB;
private static final int H2_VCORES = 25;
private static final int H3_VCORES = 35;
private static final int H3_MEMORY = 10 * GB;
private static final int H4_MEMORY = 10 * GB;
private static final int H4_VCORES = 15;
private static final String A11_MIN_VECTOR_MAX_WARNINGS =
createCapacityVector(percentage(50), percentage(100));
private static final String A12_MIN_VECTOR_MAX_WARNINGS =
createCapacityVector(percentage(50), percentage(0));
private static final Resource A_EXPECTED_MIN_RESOURCE_NO_LABEL = createResource(2048, 8);
private static final Resource A1_EXPECTED_MIN_RESOURCE_NO_LABEL = createResource(1024, 5);
private static final Resource A2_EXPECTED_MIN_RESOURCE_NO_LABEL = createResource(1024, 2);
private static final Resource B_EXPECTED_MIN_RESOURCE_NO_LABEL = createResource(3072, 8);
private static final Resource A_EXPECTED_MIN_RESOURCE_X_LABEL = createResource(30720, 30);
private static final Resource A1_EXPECTED_MIN_RESOURCE_X_LABEL = createResource(20480, 0);
private static final Resource A2_EXPECTED_MIN_RESOURCE_X_LABEL = createResource(10240, 30);
private static final Resource B_EXPECTED_MIN_RESOURCE_X_LABEL = createResource(30720, 30);
private static final Resource A_EXPECTED_MIN_RESOURCE_Y_LABEL = createResource(8096, 42);
private static final Resource A1_EXPECTED_MIN_RESOURCE_Y_LABEL = createResource(6186, 21);
private static final Resource A2_EXPECTED_MIN_RESOURCE_Y_LABEL = createResource(1910, 21);
private static final Resource B_EXPECTED_MIN_RESOURCE_Y_LABEL = createResource(12384, 18);
private static final Resource A_EXPECTED_MIN_RESOURCE_Z_LABEL = createResource(7168, 11);
private static final Resource A1_EXPECTED_MIN_RESOURCE_Z_LABEL = createResource(6451, 4);
private static final Resource A2_EXPECTED_MIN_RESOURCE_Z_LABEL = createResource(716, 7);
private static final Resource B_EXPECTED_MIN_RESOURCE_Z_LABEL = createResource(3072, 4);
private static final Resource EMPTY_LABEL_RESOURCE = Resource.newInstance(5 * GB, 16);
private static final String A_VECTOR_NO_LABEL =
createCapacityVector(absolute(2048), percentage(50));
private static final String A1_VECTOR_NO_LABEL =
createCapacityVector(absolute(1024), percentage(70));
private static final String A2_VECTOR_NO_LABEL =
createCapacityVector(absolute(1024), percentage(30));
private static final String B_VECTOR_NO_LABEL =
createCapacityVector(weight(3), percentage(50));
private static final String A_VECTOR_X_LABEL =
createCapacityVector(percentage(50), weight(3));
private static final String A1_VECTOR_X_LABEL =
createCapacityVector(absolute(20480), percentage(10));
private static final String A2_VECTOR_X_LABEL =
createCapacityVector(absolute(10240), absolute(30));
private static final String B_VECTOR_X_LABEL =
createCapacityVector(percentage(50), percentage(50));
private static final String A_VECTOR_Y_LABEL =
createCapacityVector(absolute(8096), weight(1));
private static final String A1_VECTOR_Y_LABEL =
createCapacityVector(absolute(6186), weight(3));
private static final String A2_VECTOR_Y_LABEL =
createCapacityVector(weight(3), weight(3));
private static final String B_VECTOR_Y_LABEL =
createCapacityVector(percentage(100), percentage(30));
private static final String A_VECTOR_Z_LABEL =
createCapacityVector(percentage(70), absolute(11));
private static final String A1_VECTOR_Z_LABEL =
createCapacityVector(percentage(90), percentage(40));
private static final String A2_VECTOR_Z_LABEL =
createCapacityVector(percentage(10), weight(4));
private static final String B_VECTOR_Z_LABEL =
createCapacityVector(percentage(30), absolute(4));
private static final String A_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(percentage(30), weight(6));
private static final String A11_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(percentage(35), percentage(25));
private static final String A12_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(percentage(65), percentage(75));
private static final String A2_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(weight(3), percentage(100));
private static final String B_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(absolute(8095), percentage(30));
private static final String B1_VECTOR_NO_REMAINING_RESOURCE =
createCapacityVector(weight(5), absolute(3));
private static final String A_VECTOR_WITH_WARNINGS =
createCapacityVector(percentage(100), weight(6));
private static final String A12_VECTOR_WITH_WARNING =
createCapacityVector(percentage(100), percentage(100));
private static final String A2_VECTOR_WITH_WARNING =
createCapacityVector(absolute(2048), percentage(100));
private static final String B_VECTOR_WITH_WARNING =
createCapacityVector(absolute(8096), percentage(30));
private static final String B1_VECTOR_WITH_WARNING =
createCapacityVector(absolute(10256), absolute(3));
@Override
public void setUp() throws Exception {
super.setUp();
csConf.setLegacyQueueModeEnabled(false);
}
/**
* Tests a complex scenario in which no warning or remaining resource is generated during the
* update phase (except for rounding leftovers, eg. 1 memory or 1 vcores).
*
* -root-
* / \ \
* A B C
* / \ |
* A1 A2 B1
* / \
* A11 A12
*
* @throws IOException if update is failed
*/
@Test
public void testComplexHierarchyWithoutRemainingResource() throws IOException {
setupQueueHierarchyWithoutRemainingResource();
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(A_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A1)
.assertEffectiveMinResource(A1_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A1_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A11)
.assertEffectiveMinResource(A11_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A11_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A12)
.assertEffectiveMinResource(A12_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A12_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A2)
.assertEffectiveMinResource(A2_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A2_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(B)
.assertEffectiveMinResource(B_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
B_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(B1)
.assertEffectiveMinResource(B1_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
B1_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.withQueue(C)
.assertEffectiveMinResource(C_COMPLEX_NO_REMAINING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
C_COMPLEX_NO_REMAINING_RESOURCE, UPDATE_RESOURCE))
.build();
update(assertionBuilder, UPDATE_RESOURCE);
}
/**
* Tests a complex scenario in which several validation warnings are generated during the update
* phase.
*
* -root-
* / \ \
* A B C
* / \ |
* A1 A2 B1
* / \
* A11 A12
*
* @throws IOException if update is failed
*/
@Test
public void testComplexHierarchyWithWarnings() throws IOException {
setupQueueHierarchyWithWarnings();
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(A_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A1)
.assertEffectiveMinResource(A1_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A1_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A2)
.assertEffectiveMinResource(A2_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A2_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(A11)
.assertEffectiveMinResource(ZERO_RESOURCE)
.assertAbsoluteCapacity(0)
.withQueue(A12)
.assertEffectiveMinResource(A12_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
A12_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(B)
.assertEffectiveMinResource(B_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
B_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(B1)
.assertEffectiveMinResource(B1_WARNING_RESOURCE)
.assertAbsoluteCapacity(resourceCalculator.divide(UPDATE_RESOURCE,
B1_WARNING_RESOURCE, UPDATE_RESOURCE))
.withQueue(C)
.assertEffectiveMinResource(ZERO_RESOURCE)
.assertAbsoluteCapacity(0)
.build();
QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE);
Optional<QueueUpdateWarning> queueCZeroResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, C);
Optional<QueueUpdateWarning> queueARemainingResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.BRANCH_UNDERUTILIZED, A);
Optional<QueueUpdateWarning> queueBDownscalingWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.BRANCH_DOWNSCALED, B);
Optional<QueueUpdateWarning> queueA11ZeroResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A11);
Assert.assertTrue(queueCZeroResourceWarning.isPresent());
Assert.assertTrue(queueARemainingResourceWarning.isPresent());
Assert.assertTrue(queueBDownscalingWarning.isPresent());
Assert.assertTrue(queueA11ZeroResourceWarning.isPresent());
}
@Test
public void testZeroResourceIfNoMemory() throws IOException {
csConf.setCapacityVector(A, NO_LABEL, A_VECTOR_ZERO_RESOURCE);
csConf.setCapacityVector(B, NO_LABEL, B_VECTOR_ZERO_RESOURCE);
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(ZERO_RESOURCE)
.withQueue(B)
.assertEffectiveMinResource(createResource(MEMORY, VCORES * 0.5))
.build();
QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE);
Optional<QueueUpdateWarning> queueAZeroResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_ZERO_RESOURCE, A);
Optional<QueueUpdateWarning> rootUnderUtilizedWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.BRANCH_UNDERUTILIZED, ROOT);
Assert.assertTrue(queueAZeroResourceWarning.isPresent());
Assert.assertTrue(rootUnderUtilizedWarning.isPresent());
}
@Test
public void testDifferentMinimumAndMaximumCapacityTypes() throws IOException {
csConf.setCapacityVector(A, NO_LABEL, A_MIN_VECTOR_DIFFERENT_MIN_MAX);
csConf.setMaximumCapacityVector(A, NO_LABEL, A_MAX_VECTOR_DIFFERENT_MIN_MAX);
csConf.setCapacityVector(B, NO_LABEL, B_MIN_VECTOR_DIFFERENT_MIN_MAX);
csConf.setMaximumCapacityVector(B, NO_LABEL, B_MAX_VECTOR_DIFFERENT_MIN_MAX);
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(UPDATE_RESOURCE, 0.5d))
.assertEffectiveMaxResource(A_EXPECTED_MAX_RESOURCE_DIFFERENT_MIN_MAX)
.withQueue(B)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(UPDATE_RESOURCE, 0.5d))
.assertEffectiveMaxResource(B_EXPECTED_MAX_RESOURCE_DIFFERENT_MIN_MAX)
.build();
QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE);
Assert.assertEquals(0, updateContext.getUpdateWarnings().size());
// WEIGHT capacity type for maximum capacity is not supported
csConf.setMaximumCapacityVector(B, NO_LABEL, B_INVALID_MAX_VECTOR);
try {
cs.reinitialize(csConf, mockRM.getRMContext());
update(assertionBuilder, UPDATE_RESOURCE);
Assert.fail("WEIGHT maximum capacity type is not supported, an error should be thrown when " +
"set up");
} catch (IllegalStateException ignored) {
}
}
@Test
public void testMaximumResourceWarnings() throws IOException {
csConf.setMaximumCapacityVector(A1, NO_LABEL, A1_MAX_VECTOR_MAX_WARNINGS);
csConf.setCapacityVector(A11, NO_LABEL, A11_MIN_VECTOR_MAX_WARNINGS);
csConf.setCapacityVector(A12, NO_LABEL, A12_MIN_VECTOR_MAX_WARNINGS);
csConf.setMaximumCapacityVector(A11, NO_LABEL, A11_MAX_VECTOR_MAX_WARNINGS);
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A11)
.assertEffectiveMinResource(A11_EXPECTED_MIN_RESOURCE_MAX_WARNINGS)
.assertEffectiveMaxResource(A11_EXPECTED_MAX_RESOURCE_MAX_WARNINGS)
.withQueue(A12)
.assertEffectiveMinResource(A12_EXPECTED_MIN_RESOURCE_MAX_WARNINGS)
.assertEffectiveMaxResource(A12_EXPECTED_MAX_RESOURCE_MAX_WARNINGS)
.build();
QueueCapacityUpdateContext updateContext = update(assertionBuilder, UPDATE_RESOURCE);
Optional<QueueUpdateWarning> queueA11ExceedsParentMaxResourceWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_MAX_RESOURCE_EXCEEDS_PARENT,
A11);
Optional<QueueUpdateWarning> queueA11MinExceedsMaxWarning = getSpecificWarning(
updateContext.getUpdateWarnings(), QueueUpdateWarningType.QUEUE_EXCEEDS_MAX_RESOURCE, A11);
Assert.assertTrue(queueA11ExceedsParentMaxResourceWarning.isPresent());
Assert.assertTrue(queueA11MinExceedsMaxWarning.isPresent());
}
@Test
public void testNodeLabels() throws Exception {
setLabeledQueueConfigs();
QueueAssertionBuilder assertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(A_EXPECTED_MIN_RESOURCE_NO_LABEL, NO_LABEL)
.withQueue(A1)
.assertEffectiveMinResource(A1_EXPECTED_MIN_RESOURCE_NO_LABEL, NO_LABEL)
.withQueue(A2)
.assertEffectiveMinResource(A2_EXPECTED_MIN_RESOURCE_NO_LABEL, NO_LABEL)
.withQueue(B)
.assertEffectiveMinResource(B_EXPECTED_MIN_RESOURCE_NO_LABEL, NO_LABEL)
.withQueue(A)
.assertEffectiveMinResource(A_EXPECTED_MIN_RESOURCE_X_LABEL, X_LABEL)
.withQueue(A1)
.assertEffectiveMinResource(A1_EXPECTED_MIN_RESOURCE_X_LABEL, X_LABEL)
.withQueue(A2)
.assertEffectiveMinResource(A2_EXPECTED_MIN_RESOURCE_X_LABEL, X_LABEL)
.withQueue(B)
.assertEffectiveMinResource(B_EXPECTED_MIN_RESOURCE_X_LABEL, X_LABEL)
.withQueue(A)
.assertEffectiveMinResource(A_EXPECTED_MIN_RESOURCE_Y_LABEL, Y_LABEL)
.withQueue(A1)
.assertEffectiveMinResource(A1_EXPECTED_MIN_RESOURCE_Y_LABEL, Y_LABEL)
.withQueue(A2)
.assertEffectiveMinResource(A2_EXPECTED_MIN_RESOURCE_Y_LABEL, Y_LABEL)
.withQueue(B)
.assertEffectiveMinResource(B_EXPECTED_MIN_RESOURCE_Y_LABEL, Y_LABEL)
.withQueue(A)
.assertEffectiveMinResource(A_EXPECTED_MIN_RESOURCE_Z_LABEL, Z_LABEL)
.withQueue(A1)
.assertEffectiveMinResource(A1_EXPECTED_MIN_RESOURCE_Z_LABEL, Z_LABEL)
.withQueue(A2)
.assertEffectiveMinResource(A2_EXPECTED_MIN_RESOURCE_Z_LABEL, Z_LABEL)
.withQueue(B)
.assertEffectiveMinResource(B_EXPECTED_MIN_RESOURCE_Z_LABEL, Z_LABEL)
.build();
update(assertionBuilder, UPDATE_RESOURCE, EMPTY_LABEL_RESOURCE);
}
private void setLabeledQueueConfigs() throws Exception {
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of(X_LABEL, Y_LABEL, Z_LABEL));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance(H1_NODE, 0),
TestUtils.toSet(X_LABEL), NodeId.newInstance(H2_NODE, 0),
TestUtils.toSet(Y_LABEL), NodeId.newInstance(H3_NODE, 0),
TestUtils.toSet(Y_LABEL), NodeId.newInstance(H4_NODE, 0),
TestUtils.toSet(Z_LABEL), NodeId.newInstance(H5_NODE, 0),
RMNodeLabelsManager.EMPTY_STRING_SET));
mockRM.registerNode("h1:1234", H1_MEMORY, H1_VCORES); // label = x
mockRM.registerNode("h2:1234", H2_MEMORY, H2_VCORES); // label = y
mockRM.registerNode("h3:1234", H3_MEMORY, H3_VCORES); // label = y
mockRM.registerNode("h4:1234", H4_MEMORY, H4_VCORES); // label = z
csConf.setCapacityVector(A, NO_LABEL, A_VECTOR_NO_LABEL);
csConf.setCapacityVector(A1, NO_LABEL, A1_VECTOR_NO_LABEL);
csConf.setCapacityVector(A2, NO_LABEL, A2_VECTOR_NO_LABEL);
csConf.setCapacityVector(B, NO_LABEL, B_VECTOR_NO_LABEL);
csConf.setCapacityVector(A, X_LABEL, A_VECTOR_X_LABEL);
csConf.setCapacityVector(A1, X_LABEL, A1_VECTOR_X_LABEL);
csConf.setCapacityVector(A2, X_LABEL, A2_VECTOR_X_LABEL);
csConf.setCapacityVector(B, X_LABEL, B_VECTOR_X_LABEL);
csConf.setCapacityVector(A, Y_LABEL, A_VECTOR_Y_LABEL);
csConf.setCapacityVector(A1, Y_LABEL, A1_VECTOR_Y_LABEL);
csConf.setCapacityVector(A2, Y_LABEL, A2_VECTOR_Y_LABEL);
csConf.setCapacityVector(B, Y_LABEL, B_VECTOR_Y_LABEL);
csConf.setCapacityVector(A, Z_LABEL, A_VECTOR_Z_LABEL);
csConf.setCapacityVector(A1, Z_LABEL, A1_VECTOR_Z_LABEL);
csConf.setCapacityVector(A2, Z_LABEL, A2_VECTOR_Z_LABEL);
csConf.setCapacityVector(B, Z_LABEL, B_VECTOR_Z_LABEL);
cs.reinitialize(csConf, mockRM.getRMContext());
}
private void setupQueueHierarchyWithoutRemainingResource() throws IOException {
csConf.setState(B, QueueState.STOPPED);
cs.reinitialize(csConf, mockRM.getRMContext());
setQueues();
csConf.setState(B, QueueState.RUNNING);
csConf.setCapacityVector(A, NO_LABEL, A_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(A1, NO_LABEL, A1_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(A11, NO_LABEL, A11_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(A12, NO_LABEL, A12_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(A2, NO_LABEL, A2_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(B, NO_LABEL, B_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(B1, NO_LABEL, B1_VECTOR_NO_REMAINING_RESOURCE);
csConf.setCapacityVector(C, NO_LABEL, C_VECTOR_NO_REMAINING_RESOURCE);
cs.reinitialize(csConf, mockRM.getRMContext());
}
private void setupQueueHierarchyWithWarnings() throws IOException {
csConf.setState(B, QueueState.STOPPED);
cs.reinitialize(csConf, mockRM.getRMContext());
setQueues();
csConf.setState(B, QueueState.RUNNING);
csConf.setCapacityVector(A, NO_LABEL, A_VECTOR_WITH_WARNINGS);
csConf.setCapacityVector(A1, NO_LABEL, A1_VECTOR_WITH_WARNING);
csConf.setCapacityVector(A11, NO_LABEL, A11_VECTOR_WITH_WARNING);
csConf.setCapacityVector(A12, NO_LABEL, A12_VECTOR_WITH_WARNING);
csConf.setCapacityVector(A2, NO_LABEL, A2_VECTOR_WITH_WARNING);
csConf.setCapacityVector(B, NO_LABEL, B_VECTOR_WITH_WARNING);
csConf.setCapacityVector(B1, NO_LABEL, B1_VECTOR_WITH_WARNING);
csConf.setCapacityVector(C, NO_LABEL, C_VECTOR_WITH_WARNING);
cs.reinitialize(csConf, mockRM.getRMContext());
}
private void setQueues() {
csConf.setQueues("root", new String[]{"a", "b", "c"});
csConf.setQueues(A, new String[]{"a1", "a2"});
csConf.setQueues(B, new String[]{"b1"});
}
private Optional<QueueUpdateWarning> getSpecificWarning(
Collection<QueueUpdateWarning> warnings, QueueUpdateWarningType warningTypeToSelect,
String queue) {
return warnings.stream().filter((w) -> w.getWarningType().equals(warningTypeToSelect)
&& w.getQueue().equals(queue)).findFirst();
}
}

View File

@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
@ -50,21 +50,21 @@ public class TestQueueCapacityVector {
public void getResourceNamesByCapacityType() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(MEMORY_URI, 10, ResourceUnitCapacityType.PERCENTAGE);
capacityVector.setResource(VCORES_URI, 6, ResourceUnitCapacityType.PERCENTAGE);
// custom is not set, defaults to 0
Assert.assertEquals(1, capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.ABSOLUTE).size());
ResourceUnitCapacityType.ABSOLUTE).size());
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE));
ResourceUnitCapacityType.ABSOLUTE).contains(CUSTOM_RESOURCE));
Assert.assertEquals(2, capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).size());
ResourceUnitCapacityType.PERCENTAGE).size());
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).contains(VCORES_URI));
ResourceUnitCapacityType.PERCENTAGE).contains(VCORES_URI));
Assert.assertTrue(capacityVector.getResourceNamesByCapacityType(
QueueCapacityType.PERCENTAGE).contains(MEMORY_URI));
ResourceUnitCapacityType.PERCENTAGE).contains(MEMORY_URI));
Assert.assertEquals(10, capacityVector.getResource(MEMORY_URI).getResourceValue(), EPSILON);
Assert.assertEquals(6, capacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
}
@ -73,13 +73,15 @@ public class TestQueueCapacityVector {
public void isResourceOfType() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
capacityVector.setResource(MEMORY_URI, 10, ResourceUnitCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, ResourceUnitCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, ResourceUnitCapacityType.ABSOLUTE);
Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, QueueCapacityType.WEIGHT));
Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI, QueueCapacityType.PERCENTAGE));
Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE, QueueCapacityType.ABSOLUTE));
Assert.assertTrue(capacityVector.isResourceOfType(MEMORY_URI, ResourceUnitCapacityType.WEIGHT));
Assert.assertTrue(capacityVector.isResourceOfType(VCORES_URI,
ResourceUnitCapacityType.PERCENTAGE));
Assert.assertTrue(capacityVector.isResourceOfType(CUSTOM_RESOURCE,
ResourceUnitCapacityType.ABSOLUTE));
}
@Test
@ -99,9 +101,9 @@ public class TestQueueCapacityVector {
public void testToString() {
QueueCapacityVector capacityVector = QueueCapacityVector.newInstance();
capacityVector.setResource(MEMORY_URI, 10, QueueCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, QueueCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, QueueCapacityType.ABSOLUTE);
capacityVector.setResource(MEMORY_URI, 10, ResourceUnitCapacityType.WEIGHT);
capacityVector.setResource(VCORES_URI, 6, ResourceUnitCapacityType.PERCENTAGE);
capacityVector.setResource(CUSTOM_RESOURCE, 3, ResourceUnitCapacityType.ABSOLUTE);
Assert.assertEquals(MIXED_CAPACITY_VECTOR_STRING, capacityVector.toString());

View File

@ -68,7 +68,7 @@ public class TestResourceVector {
public void testSubtract() {
ResourceVector lhsResourceVector = ResourceVector.of(13);
ResourceVector rhsResourceVector = ResourceVector.of(5);
lhsResourceVector.subtract(rhsResourceVector);
lhsResourceVector.decrement(rhsResourceVector);
Assert.assertEquals(8, lhsResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(8, lhsResourceVector.getValue(VCORES_URI), EPSILON);
@ -77,7 +77,7 @@ public class TestResourceVector {
ResourceVector negativeResourceVector = ResourceVector.of(-100);
// Check whether overflow causes any issues
negativeResourceVector.subtract(ResourceVector.of(Float.MAX_VALUE));
negativeResourceVector.decrement(ResourceVector.of(Float.MAX_VALUE));
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(MEMORY_URI), EPSILON);
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(VCORES_URI), EPSILON);
Assert.assertEquals(-Float.MAX_VALUE, negativeResourceVector.getValue(CUSTOM_RESOURCE),
@ -111,7 +111,7 @@ public class TestResourceVector {
Assert.assertNotEquals(resource, resourceVector);
ResourceVector resourceVectorOne = ResourceVector.of(1);
resourceVectorOther.subtract(resourceVectorOne);
resourceVectorOther.decrement(resourceVectorOne);
Assert.assertEquals(resourceVectorOther, resourceVector);
}

View File

@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.GB;
public class TestUniformQueueResourceCalculation extends CapacitySchedulerQueueCalculationTestBase {
private static final Resource QUEUE_A_RES = Resource.newInstance(80 * GB,
10);
private static final Resource QUEUE_B_RES = Resource.newInstance(170 * GB,
30);
private static final Resource QUEUE_A1_RES = Resource.newInstance(50 * GB,
4);
private static final Resource QUEUE_A2_RES = Resource.newInstance(30 * GB,
6);
private static final Resource QUEUE_A11_RES = Resource.newInstance(40 * GB,
2);
private static final Resource QUEUE_A12_RES = Resource.newInstance(10 * GB,
2);
private static final Resource UPDATE_RES = Resource.newInstance(250 * GB, 40);
private static final Resource PERCENTAGE_ALL_RES = Resource.newInstance(10 * GB, 20);
public static final double A_CAPACITY = 0.3;
public static final double B_CAPACITY = 0.7;
public static final double A1_CAPACITY = 0.17;
public static final double A11_CAPACITY = 0.25;
public static final double A12_CAPACITY = 0.75;
public static final double A2_CAPACITY = 0.83;
public static final float A_WEIGHT = 3;
public static final float B_WEIGHT = 6;
public static final float A1_WEIGHT = 2;
public static final float A11_WEIGHT = 5;
public static final float A12_WEIGHT = 8;
public static final float A2_WEIGHT = 3;
public static final double A_NORMALIZED_WEIGHT = A_WEIGHT / (A_WEIGHT + B_WEIGHT);
public static final double B_NORMALIZED_WEIGHT = B_WEIGHT / (A_WEIGHT + B_WEIGHT);
public static final double A1_NORMALIZED_WEIGHT = A1_WEIGHT / (A1_WEIGHT + A2_WEIGHT);
public static final double A2_NORMALIZED_WEIGHT = A2_WEIGHT / (A1_WEIGHT + A2_WEIGHT);
public static final double A11_NORMALIZED_WEIGHT = A11_WEIGHT / (A11_WEIGHT + A12_WEIGHT);
public static final double A12_NORMALIZED_WEIGHT = A12_WEIGHT / (A11_WEIGHT + A12_WEIGHT);
@Test
public void testWeightResourceCalculation() throws IOException {
csConf.setNonLabeledQueueWeight(A, A_WEIGHT);
csConf.setNonLabeledQueueWeight(B, B_WEIGHT);
csConf.setNonLabeledQueueWeight(A1, A1_WEIGHT);
csConf.setNonLabeledQueueWeight(A11, A11_WEIGHT);
csConf.setNonLabeledQueueWeight(A12, A12_WEIGHT);
csConf.setNonLabeledQueueWeight(A2, A2_WEIGHT);
QueueAssertionBuilder queueAssertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES, A_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(A_NORMALIZED_WEIGHT)
.withQueue(B)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES, B_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(B_NORMALIZED_WEIGHT)
.withQueue(A1)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES,
A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT)
.withQueue(A2)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES,
A_NORMALIZED_WEIGHT * A2_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(A_NORMALIZED_WEIGHT * A2_NORMALIZED_WEIGHT)
.withQueue(A11)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES,
A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT * A11_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT * A11_NORMALIZED_WEIGHT)
.withQueue(A12)
.assertEffectiveMinResource(ResourceUtils.multiplyRound(UPDATE_RES,
A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT * A12_NORMALIZED_WEIGHT))
.assertAbsoluteCapacity(A_NORMALIZED_WEIGHT * A1_NORMALIZED_WEIGHT * A12_NORMALIZED_WEIGHT)
.build();
update(queueAssertionBuilder, UPDATE_RES);
}
@Test
public void testPercentageResourceCalculation() throws IOException {
csConf.setCapacity(A, (float) (A_CAPACITY * 100));
csConf.setCapacity(B, (float) (B_CAPACITY * 100));
csConf.setCapacity(A1, (float) (A1_CAPACITY * 100));
csConf.setCapacity(A11, (float) (A11_CAPACITY * 100));
csConf.setCapacity(A12, (float) (A12_CAPACITY * 100));
csConf.setCapacity(A2, (float) (A2_CAPACITY * 100));
QueueAssertionBuilder queueAssertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES, A_CAPACITY))
.assertCapacity(A_CAPACITY)
.assertAbsoluteCapacity(A_CAPACITY)
.withQueue(B)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES, B_CAPACITY))
.assertCapacity(B_CAPACITY)
.assertAbsoluteCapacity(B_CAPACITY)
.withQueue(A1)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES,
A_CAPACITY * A1_CAPACITY))
.assertCapacity(A1_CAPACITY)
.assertAbsoluteCapacity(A_CAPACITY * A1_CAPACITY)
.withQueue(A2)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES,
A_CAPACITY * A2_CAPACITY))
.assertCapacity(A2_CAPACITY)
.assertAbsoluteCapacity(A_CAPACITY * A2_CAPACITY)
.withQueue(A11)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES,
A11_CAPACITY * A_CAPACITY * A1_CAPACITY))
.assertCapacity(A11_CAPACITY)
.assertAbsoluteCapacity(A11_CAPACITY * A_CAPACITY * A1_CAPACITY)
.withQueue(A12)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(PERCENTAGE_ALL_RES,
A12_CAPACITY * A_CAPACITY * A1_CAPACITY))
.assertCapacity(A12_CAPACITY)
.assertAbsoluteCapacity(A12_CAPACITY * A_CAPACITY * A1_CAPACITY)
.build();
update(queueAssertionBuilder, PERCENTAGE_ALL_RES);
}
@Test
public void testAbsoluteResourceCalculation() throws IOException {
csConf.setMinimumResourceRequirement("", new QueuePath(A), QUEUE_A_RES);
csConf.setMinimumResourceRequirement("", new QueuePath(B), QUEUE_B_RES);
csConf.setMinimumResourceRequirement("", new QueuePath(A1), QUEUE_A1_RES);
csConf.setMinimumResourceRequirement("", new QueuePath(A2), QUEUE_A2_RES);
csConf.setMinimumResourceRequirement("", new QueuePath(A11), QUEUE_A11_RES);
csConf.setMinimumResourceRequirement("", new QueuePath(A12), QUEUE_A12_RES);
QueueAssertionBuilder queueAssertionBuilder = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(QUEUE_A_RES)
.withQueue(B)
.assertEffectiveMinResource(QUEUE_B_RES)
.withQueue(A1)
.assertEffectiveMinResource(QUEUE_A1_RES)
.withQueue(A2)
.assertEffectiveMinResource(QUEUE_A2_RES)
.withQueue(A11)
.assertEffectiveMinResource(QUEUE_A11_RES)
.withQueue(A12)
.assertEffectiveMinResource(QUEUE_A12_RES)
.build();
update(queueAssertionBuilder, UPDATE_RES);
QueueAssertionBuilder queueAssertionHalfClusterResource = createAssertionBuilder()
.withQueue(A)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_A_RES, 0.5f))
.withQueue(B)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_B_RES, 0.5f))
.withQueue(A1)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_A1_RES, 0.5f))
.withQueue(A2)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_A2_RES, 0.5f))
.withQueue(A11)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_A11_RES, 0.5f))
.withQueue(A12)
.assertEffectiveMinResource(ResourceUtils.multiplyFloor(QUEUE_A12_RES, 0.5f))
.build();
update(queueAssertionHalfClusterResource, ResourceUtils.multiplyFloor(UPDATE_RES, 0.5f));
}
}

View File

@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityVectorEntry;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.QueueCapacityType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacityVector.ResourceUnitCapacityType;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Assert;
import org.junit.Test;
@ -33,7 +33,6 @@ import java.util.List;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.MEMORY_URI;
import static org.apache.hadoop.yarn.api.records.ResourceInformation.VCORES_URI;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources.GB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
@ -70,47 +69,43 @@ public class TestQueueCapacityConfigParser {
@Test
public void testPercentageCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setCapacity(QUEUE, PERCENTAGE_VALUE);
QueueCapacityVector percentageCapacityVector = capacityConfigParser.parse(conf, QUEUE,
NO_LABEL);
QueueCapacityVector percentageCapacityVector =
capacityConfigParser.parse(Float.toString(PERCENTAGE_VALUE), QUEUE);
QueueCapacityVectorEntry memory = percentageCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcore = percentageCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, memory.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.PERCENTAGE, memory.getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE, memory.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcore.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.PERCENTAGE, vcore.getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE, vcore.getResourceValue(), EPSILON);
QueueCapacityVector rootCapacityVector = capacityConfigParser.parse(conf,
CapacitySchedulerConfiguration.ROOT, NO_LABEL);
QueueCapacityVector rootCapacityVector =
capacityConfigParser.parse(Float.toString(PERCENTAGE_VALUE),
CapacitySchedulerConfiguration.ROOT);
QueueCapacityVectorEntry memoryRoot = rootCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcoreRoot = rootCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.PERCENTAGE, memoryRoot.getVectorResourceType());
Assert.assertEquals(100f, memoryRoot.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.PERCENTAGE, vcoreRoot.getVectorResourceType());
Assert.assertEquals(100f, vcoreRoot.getResourceValue(), EPSILON);
}
@Test
public void testWeightCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setNonLabeledQueueWeight(QUEUE, WEIGHT_VALUE);
QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
QueueCapacityVector weightCapacityVector = capacityConfigParser.parse(WEIGHT_VALUE + "w",
QUEUE);
QueueCapacityVectorEntry memory = weightCapacityVector.getResource(MEMORY_URI);
QueueCapacityVectorEntry vcore = weightCapacityVector.getResource(VCORES_URI);
Assert.assertEquals(QueueCapacityType.WEIGHT, memory.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.WEIGHT, memory.getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE, memory.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.WEIGHT, vcore.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.WEIGHT, vcore.getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE, vcore.getResourceValue(), EPSILON);
}
@ -122,26 +117,26 @@ public class TestQueueCapacityConfigParser {
conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
ResourceUtils.resetResourceTypes(conf);
QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
QueueCapacityVector absoluteCapacityVector = capacityConfigParser.parse(ABSOLUTE_RESOURCE,
QUEUE);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(MEMORY_URI)
.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.ABSOLUTE,
absoluteCapacityVector.getResource(MEMORY_URI).getVectorResourceType());
Assert.assertEquals(12 * GB, absoluteCapacityVector.getResource(MEMORY_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.ABSOLUTE,
absoluteCapacityVector.getResource(VCORES_URI).getVectorResourceType());
Assert.assertEquals(VCORE_ABSOLUTE, absoluteCapacityVector.getResource(VCORES_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
.getVectorResourceType());
Assert.assertEquals(ResourceUnitCapacityType.ABSOLUTE,
absoluteCapacityVector.getResource(GPU_URI).getVectorResourceType());
Assert.assertEquals(GPU_ABSOLUTE, absoluteCapacityVector.getResource(GPU_URI)
.getResourceValue(), EPSILON);
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE) +
CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
QueueCapacityVector withoutGpuVector = capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
QueueCapacityVector withoutGpuVector = capacityConfigParser
.parse(ABSOLUTE_RESOURCE_MEMORY_VCORE, QUEUE);
Assert.assertEquals(3, withoutGpuVector.getResourceCount());
Assert.assertEquals(0f, withoutGpuVector.getResource(GPU_URI).getResourceValue(), EPSILON);
@ -150,36 +145,31 @@ public class TestQueueCapacityConfigParser {
@Test
public void testMixedCapacityConfig() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, MIXED_RESOURCE);
conf.set(YarnConfiguration.RESOURCE_TYPES, RESOURCE_TYPES);
ResourceUtils.resetResourceTypes(conf);
QueueCapacityVector mixedCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(MIXED_RESOURCE, QUEUE);
Assert.assertEquals(QueueCapacityType.ABSOLUTE,
Assert.assertEquals(ResourceUnitCapacityType.ABSOLUTE,
mixedCapacityVector.getResource(MEMORY_URI).getVectorResourceType());
Assert.assertEquals(MEMORY_MIXED, mixedCapacityVector.getResource(MEMORY_URI)
.getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.PERCENTAGE,
Assert.assertEquals(ResourceUnitCapacityType.PERCENTAGE,
mixedCapacityVector.getResource(VCORES_URI).getVectorResourceType());
Assert.assertEquals(PERCENTAGE_VALUE,
mixedCapacityVector.getResource(VCORES_URI).getResourceValue(), EPSILON);
Assert.assertEquals(QueueCapacityType.WEIGHT,
Assert.assertEquals(ResourceUnitCapacityType.WEIGHT,
mixedCapacityVector.getResource(GPU_URI).getVectorResourceType());
Assert.assertEquals(WEIGHT_VALUE,
mixedCapacityVector.getResource(GPU_URI).getResourceValue(), EPSILON);
// Test undefined capacity type default value
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, ABSOLUTE_RESOURCE_MEMORY_VCORE);
QueueCapacityVector mixedCapacityVectorWithGpuUndefined =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
Assert.assertEquals(QueueCapacityType.ABSOLUTE,
capacityConfigParser.parse(ABSOLUTE_RESOURCE_MEMORY_VCORE, QUEUE);
Assert.assertEquals(ResourceUnitCapacityType.ABSOLUTE,
mixedCapacityVectorWithGpuUndefined.getResource(MEMORY_URI).getVectorResourceType());
Assert.assertEquals(0, mixedCapacityVectorWithGpuUndefined.getResource(GPU_URI)
.getResourceValue(), EPSILON);
@ -188,52 +178,38 @@ public class TestQueueCapacityConfigParser {
@Test
public void testInvalidCapacityConfigs() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, NONEXISTINGSUFFIX);
QueueCapacityVector capacityVectorWithInvalidSuffix =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(NONEXISTINGSUFFIX, QUEUE);
List<QueueCapacityVectorEntry> entriesWithInvalidSuffix =
Lists.newArrayList(capacityVectorWithInvalidSuffix.iterator());
Assert.assertEquals(0, entriesWithInvalidSuffix.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_FORMAT);
QueueCapacityVector invalidDelimiterCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(INVALID_CAPACITY_FORMAT, QUEUE);
List<QueueCapacityVectorEntry> invalidDelimiterEntries =
Lists.newArrayList(invalidDelimiterCapacityVector.iterator());
Assert.assertEquals(0, invalidDelimiterEntries.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, INVALID_CAPACITY_BRACKET);
QueueCapacityVector invalidCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(INVALID_CAPACITY_BRACKET, QUEUE);
List<QueueCapacityVectorEntry> resources =
Lists.newArrayList(invalidCapacityVector.iterator());
Assert.assertEquals(0, resources.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, EMPTY_BRACKET);
QueueCapacityVector emptyBracketCapacityVector =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(EMPTY_BRACKET, QUEUE);
List<QueueCapacityVectorEntry> emptyEntries =
Lists.newArrayList(emptyBracketCapacityVector.iterator());
Assert.assertEquals(0, emptyEntries.size());
conf.set(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY, "");
QueueCapacityVector emptyCapacity =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse("", QUEUE);
List<QueueCapacityVectorEntry> emptyResources =
Lists.newArrayList(emptyCapacity.iterator());
Assert.assertEquals(emptyResources.size(), 0);
conf.unset(CapacitySchedulerConfiguration.getQueuePrefix(QUEUE)
+ CapacitySchedulerConfiguration.CAPACITY);
QueueCapacityVector nonSetCapacity =
capacityConfigParser.parse(conf, QUEUE, NO_LABEL);
capacityConfigParser.parse(null, QUEUE);
List<QueueCapacityVectorEntry> nonSetResources =
Lists.newArrayList(nonSetCapacity.iterator());
Assert.assertEquals(nonSetResources.size(), 0);