diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 809a8603e25..4eb7b688209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1298,7 +1298,7 @@ void updateEffectiveResources(Resource clusterResource) { CapacityConfigType.ABSOLUTE_RESOURCE)) { newEffectiveMinResource = createNormalizedMinResource( usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label), - ((ParentQueue) parent).getEffectiveMinRatioPerResource()); + ((ParentQueue) parent).getEffectiveMinRatio(label)); // Max resource of a queue should be the minimum of {parent's maxResources, // this queue's maxResources}. Both parent's maxResources and this queue's diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index b77a90a3d33..c624aab71cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; @@ -101,11 +102,11 @@ public class ParentQueue extends AbstractCSQueue { private AutoCreatedQueueTemplate autoCreatedQueueTemplate; - // effective min ratio per resource, it is used during updateClusterResource, - // leaf queue can use this to calculate effective resources. - // This field will not be edited, reference will point to a new immutable map - // after every time recalculation - private volatile Map effectiveMinRatioPerResource; + // A ratio of the queue's effective minimum resource and the summary of the configured + // minimum resource of its children grouped by labels and calculated for each resource names + // distinctively. + private final Map> effectiveMinResourceRatio = + new ConcurrentHashMap<>(); public ParentQueue(CapacitySchedulerQueueContext queueContext, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -1328,8 +1329,8 @@ private void calculateEffectiveResourcesAndCapacity(String label, } } - effectiveMinRatioPerResource = getEffectiveMinRatioPerResource( - configuredMinResources, numeratorForMinRatio); + effectiveMinResourceRatio.put(label, getEffectiveMinRatio( + configuredMinResources, numeratorForMinRatio)); // Update effective resources for my self; if (rootQueue) { @@ -1340,7 +1341,7 @@ private void calculateEffectiveResourcesAndCapacity(String label, } } - private Map getEffectiveMinRatioPerResource( + private Map getEffectiveMinRatio( Resource configuredMinResources, Resource numeratorForMinRatio) { Map effectiveMinRatioPerResource = new HashMap<>(); if (numeratorForMinRatio != null) { @@ -1637,9 +1638,8 @@ void decrementRunnableApps() { } } - // This is a locking free method - Map getEffectiveMinRatioPerResource() { - return effectiveMinRatioPerResource; + Map getEffectiveMinRatio(String label) { + return effectiveMinResourceRatio.get(label); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java index d9051dd53e1..d7c80b5dda1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -22,12 +22,15 @@ import java.util.HashSet; import java.util.Set; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -91,6 +94,8 @@ public class TestAbsoluteResourceConfiguration { Resource.newInstance(25 * GB, 5); private static final Resource QUEUE_D_TEMPL_MAXRES = Resource.newInstance(150 * GB, 20); + public static final String X_LABEL = "X"; + public static final String Y_LABEL = "Y"; private static Set resourceTypes = new HashSet<>( Arrays.asList("memory", "vcores")); @@ -141,6 +146,26 @@ private CapacitySchedulerConfiguration setupComplexQueueConfiguration( return csConf; } + private CapacitySchedulerConfiguration setupLabeledConfiguration( + CapacitySchedulerConfiguration csConf) { + csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resource.newInstance(20 * GB, 8)); + csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resource.newInstance(10 * GB, 3)); + csConf.setMinimumResourceRequirement("", QUEUEC_FULL, Resource.newInstance(10 * GB, 2)); + csConf.setMinimumResourceRequirement("", QUEUED_FULL, Resource.newInstance(10 * GB, 2)); + + csConf.setMinimumResourceRequirement(X_LABEL, QUEUEA_FULL, Resource.newInstance(20 * GB, 8)); + csConf.setMinimumResourceRequirement(X_LABEL, QUEUEB_FULL, Resource.newInstance(10 * GB, 3)); + csConf.setMinimumResourceRequirement(X_LABEL, QUEUEC_FULL, Resource.newInstance(10 * GB, 2)); + csConf.setMinimumResourceRequirement(X_LABEL, QUEUED_FULL, Resource.newInstance(10 * GB, 2)); + + csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEA_FULL, Resource.newInstance(2 * GB, 1)); + csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEB_FULL, Resource.newInstance(2 * GB, 1)); + csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEC_FULL, Resource.newInstance(2 * GB, 1)); + csConf.setMinimumResourceRequirement(Y_LABEL, QUEUED_FULL, Resource.newInstance(2 * GB, 2)); + + return csConf; + } + private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration( CapacitySchedulerConfiguration csConf) { @@ -576,6 +601,42 @@ public void testValidateAbsoluteResourceConfig() throws Exception { } } + @Test + public void testDownscalingForLabels() throws Exception { + CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(false); + setupLabeledConfiguration(csConf); + + csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(csConf); + rm.start(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB, 5); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * GB, 5); + MockNM nm3 = rm.registerNode("127.0.0.3:1234", 8 * GB, 5); + MockNM nm4 = rm.registerNode("127.0.0.4:1234", 8 * GB, 5); + + rm.getRMContext().getNodeLabelManager().addToCluserNodeLabelsWithDefaultExclusivity( + ImmutableSet.of(X_LABEL, Y_LABEL)); + rm.getRMContext().getNodeLabelManager().addLabelsToNode( + ImmutableMap.of(nm1.getNodeId(), ImmutableSet.of(X_LABEL), + nm2.getNodeId(), ImmutableSet.of(X_LABEL), + nm3.getNodeId(), ImmutableSet.of(X_LABEL), + nm4.getNodeId(), ImmutableSet.of(Y_LABEL))); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue root = cs.getRootQueue(); + root.updateClusterResource(cs.getClusterResource(), new ResourceLimits(cs.getClusterResource())); + + Resource childrenResource = root.getChildQueues().stream().map(q -> q.getEffectiveCapacity( + X_LABEL)).reduce(Resources::add).orElse(Resource.newInstance(0, 0)); + + Assert.assertTrue("Children of root have more resource than overall cluster resource", + Resources.greaterThan(cs.getResourceCalculator(), cs.getClusterResource(), + root.getEffectiveCapacity(X_LABEL), childrenResource)); + } + @Test public void testEffectiveResourceAfterReducingClusterResource() throws Exception {