diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 96d309c547e..ccf33632efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -771,8 +771,13 @@ public class LeafQueue extends AbstractCSQueue { // Current usable resource for this queue and partition is the max of // queueCurrentLimit and queuePartitionResource. - Resource queuePartitionUsableResource = Resources.max(resourceCalculator, - lastClusterResource, queueCurrentLimit, queuePartitionResource); + // If any of the resources available to this queue are less than queue's + // guarantee, use the guarantee as the queuePartitionUsableResource + // because nothing less than the queue's guarantee should be used when + // calculating the AM limit. + Resource queuePartitionUsableResource = (Resources.fitsIn( + resourceCalculator, queuePartitionResource, queueCurrentLimit)) ? + queueCurrentLimit : queuePartitionResource; Resource amResouceLimit = Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionUsableResource, amResourcePercent, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 84608effa31..f4ffa7ba201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; @@ -942,4 +943,77 @@ public class TestApplicationLimits { rm.killApp(app14.getApplicationId()); rm.stop(); } + + // Test that max AM limit is correct in the case where one resource is + // depleted but the other is not. Use DominantResourceCalculator. + @Test + public void testAMResourceLimitWithDRCAndFullParent() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(csConf); + csConf.setFloat(CapacitySchedulerConfiguration. + MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.3f); + YarnConfiguration conf = new YarnConfiguration(); + + CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB)); + when(csContext.getResourceCalculator()). + thenReturn(new DominantResourceCalculator()); + when(csContext.getRMContext()).thenReturn(rmContext); + when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager()); + + // Total cluster resources. + Resource clusterResource = Resources.createResource(100 * GB, 1000); + when(csContext.getClusterResource()).thenReturn(clusterResource); + + // Set up queue hierarchy. + CSQueueStore queues = new CSQueueStore(); + CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext, + csConf, null, "root", queues, queues, TestUtils.spyHook); + rootQueue.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that + // can be used for AMs is 30%. So, 30% of is + // , which is the guaranteed capacity of "queueA". + // 30% of that (rounded to the nearest 1GB) is . The + // max AM queue limit should never be less than that for any resource. + LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A)); + queueA.setCapacity(30.0f); + queueA.setUserLimitFactor(10f); + queueA.setMaxAMResourcePerQueuePercent(0.3f); + // Make sure "queueA" knows the total cluster resource. + queueA.updateClusterResource(clusterResource, new ResourceLimits( + clusterResource)); + // Get "queueA"'s guaranteed capacity (). + Resource capacity = + Resources.multiply(clusterResource, (queueA.getCapacity()/100)); + // Limit is the actual resources available to "queueA". The following + // simulates the case where a second queue ("queueB") has "borrowed" almost + // all of "queueA"'s resources because "queueB" has a max capacity of 100% + // and has gone well over its guaranteed capacity. In this case, "queueB" + // has used 99GB of memory and used 505 vCores. This is to make vCores + // dominant in the calculations for the available resources. + when(queueA.getEffectiveCapacity(any())).thenReturn(capacity); + Resource limit = Resource.newInstance(1024, 495); + ResourceLimits currentResourceLimits = + new ResourceLimits(limit, Resources.none()); + queueA.updateClusterResource(clusterResource, currentResourceLimits); + Resource expectedAmLimit = Resources.multiply(capacity, + queueA.getMaxAMResourcePerQueuePercent()); + Resource amLimit = queueA.calculateAndGetAMResourceLimit(); + assertTrue("AM memory limit is less than expected: Expected: " + + expectedAmLimit.getMemorySize() + "; Computed: " + + amLimit.getMemorySize(), + amLimit.getMemorySize() >= expectedAmLimit.getMemorySize()); + assertTrue("AM vCore limit is less than expected: Expected: " + + expectedAmLimit.getVirtualCores() + "; Computed: " + + amLimit.getVirtualCores(), + amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores()); + } }