From 008bd8afc3678accdcf5235d3d023a2e96d6fae1 Mon Sep 17 00:00:00 2001 From: Eric Badger Date: Thu, 23 Sep 2021 17:12:45 +0000 Subject: [PATCH] YARN-10935. AM Total Queue Limit goes below per-user AM Limit if parent is full. Contributed by Eric Payne. --- .../scheduler/capacity/LeafQueue.java | 11 ++- .../capacity/TestApplicationLimits.java | 79 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 4579811f18d..0656e76f6ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -741,8 +741,15 @@ public class LeafQueue extends AbstractCSQueue { // Current usable resource for this queue and partition is the max of // queueCurrentLimit and queuePartitionResource. - Resource queuePartitionUsableResource = Resources.max(resourceCalculator, - lastClusterResource, queueCurrentLimit, queuePartitionResource); + // If any of the resources available to this queue are less than queue's + // guarantee, use the guarantee as the queuePartitionUsableResource + // because nothing less than the queue's guarantee should be used when + // calculating the AM limit. + Resource queuePartitionUsableResource = + (Resources.fitsIn(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, lastClusterResource), + queuePartitionResource, queueCurrentLimit)) ? + queueCurrentLimit : queuePartitionResource; Resource amResouceLimit = Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionUsableResource, amResourcePercent, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index e9b1f9d795a..0a65d13e3e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; @@ -860,4 +861,82 @@ public class TestApplicationLimits { rm.killApp(app14.getApplicationId()); rm.stop(); } + + // Test that max AM limit is correct in the case where one resource is + // depleted but the other is not. Use DominantResourceCalculator. + @Test + public void testAMResourceLimitWithDRCAndFullParent() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + String qpathA = CapacitySchedulerConfiguration.ROOT + "." + A; + String qpathB = CapacitySchedulerConfiguration.ROOT + "." + B; + String capacityA = CapacitySchedulerConfiguration.PREFIX + qpathA + "." + + CapacitySchedulerConfiguration.CAPACITY; + String capacityB = CapacitySchedulerConfiguration.PREFIX + qpathB + "." + + CapacitySchedulerConfiguration.CAPACITY; + csConf.set(capacityA, "30.0"); + csConf.set(capacityB, "70.0"); + csConf.setMaximumApplicationMasterResourcePerQueuePercent(qpathA, 0.3f); + YarnConfiguration conf = new YarnConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB)); + when(csContext.getResourceCalculator()). + thenReturn(new DominantResourceCalculator()); + when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); + + // Total cluster resources. + Resource clusterResource = Resources.createResource(100 * GB, 1000); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + // Set up queue hierarchy. + Map queues = new HashMap(); + CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, "root", queues, queues, TestUtils.spyHook); + rootQueue.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that + // can be used for AMs is 30%. So, 30% of is + // , which is the guaranteed capacity of "queueA". + // 30% of that (rounded to the nearest 1GB) is . The + // max AM queue limit should never be less than that for any resource. + LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); + queueA.setUserLimitFactor(10f); + // Make sure "queueA" knows the total cluster resource. + queueA.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + // Get "queueA"'s guaranteed capacity (). + Resource capacity = + Resources.multiply(clusterResource, queueA.getCapacity()); + // Limit is the actual resources available to "queueA". The following + // simulates the case where a second queue ("queueB") has "borrowed" almost + // all of "queueA"'s resources because "queueB" has a max capacity of 100% + // and has gone well over its guaranteed capacity. In this case, "queueB" + // has used 99GB of memory and used 505 vCores. This is to make vCores + // dominant in the calculations for the available resources. + Resource limit = Resource.newInstance(1024, 495); + ResourceLimits currentResourceLimits = + new ResourceLimits(limit, Resources.none()); + // Update queueA's limit again to reflect over-usage by queueB. + queueA.updateClusterResource(clusterResource, currentResourceLimits); + Resource expectedAmLimit = Resources.multiply(capacity, + queueA.getMaxAMResourcePerQueuePercent()); + Resource amLimit = queueA.calculateAndGetAMResourceLimit(); + assertTrue("AM memory limit is less than expected: Expected: " + + expectedAmLimit.getMemorySize() + "; Computed: " + + amLimit.getMemorySize(), + amLimit.getMemorySize() >= expectedAmLimit.getMemorySize()); + assertTrue("AM vCore limit is less than expected: Expected: " + + expectedAmLimit.getVirtualCores() + "; Computed: " + + amLimit.getVirtualCores(), + amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores()); + } }