YARN-10935. AM Total Queue Limit goes below per-user AM Limit if parent is full. Contributed by Eric Payne.
This commit is contained in:
parent
25d0a97557
commit
008bd8afc3
|
@ -741,8 +741,15 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Current usable resource for this queue and partition is the max of
|
// Current usable resource for this queue and partition is the max of
|
||||||
// queueCurrentLimit and queuePartitionResource.
|
// queueCurrentLimit and queuePartitionResource.
|
||||||
Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
|
// If any of the resources available to this queue are less than queue's
|
||||||
lastClusterResource, queueCurrentLimit, queuePartitionResource);
|
// guarantee, use the guarantee as the queuePartitionUsableResource
|
||||||
|
// because nothing less than the queue's guarantee should be used when
|
||||||
|
// calculating the AM limit.
|
||||||
|
Resource queuePartitionUsableResource =
|
||||||
|
(Resources.fitsIn(resourceCalculator,
|
||||||
|
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
|
||||||
|
queuePartitionResource, queueCurrentLimit)) ?
|
||||||
|
queueCurrentLimit : queuePartitionResource;
|
||||||
|
|
||||||
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
|
Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
|
||||||
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
|
resourceCalculator, queuePartitionUsableResource, amResourcePercent,
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
|
@ -860,4 +861,82 @@ public class TestApplicationLimits {
|
||||||
rm.killApp(app14.getApplicationId());
|
rm.killApp(app14.getApplicationId());
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that max AM limit is correct in the case where one resource is
|
||||||
|
// depleted but the other is not. Use DominantResourceCalculator.
|
||||||
|
@Test
|
||||||
|
public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
setupQueueConfiguration(csConf);
|
||||||
|
String qpathA = CapacitySchedulerConfiguration.ROOT + "." + A;
|
||||||
|
String qpathB = CapacitySchedulerConfiguration.ROOT + "." + B;
|
||||||
|
String capacityA = CapacitySchedulerConfiguration.PREFIX + qpathA + "."
|
||||||
|
+ CapacitySchedulerConfiguration.CAPACITY;
|
||||||
|
String capacityB = CapacitySchedulerConfiguration.PREFIX + qpathB + "."
|
||||||
|
+ CapacitySchedulerConfiguration.CAPACITY;
|
||||||
|
csConf.set(capacityA, "30.0");
|
||||||
|
csConf.set(capacityB, "70.0");
|
||||||
|
csConf.setMaximumApplicationMasterResourcePerQueuePercent(qpathA, 0.3f);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
|
||||||
|
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||||
|
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||||
|
when(csContext.getConf()).thenReturn(conf);
|
||||||
|
when(csContext.getMinimumResourceCapability()).
|
||||||
|
thenReturn(Resources.createResource(GB));
|
||||||
|
when(csContext.getMaximumResourceCapability()).
|
||||||
|
thenReturn(Resources.createResource(16*GB));
|
||||||
|
when(csContext.getResourceCalculator()).
|
||||||
|
thenReturn(new DominantResourceCalculator());
|
||||||
|
when(csContext.getRMContext()).thenReturn(rmContext);
|
||||||
|
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
|
||||||
|
|
||||||
|
// Total cluster resources.
|
||||||
|
Resource clusterResource = Resources.createResource(100 * GB, 1000);
|
||||||
|
when(csContext.getClusterResource()).thenReturn(clusterResource);
|
||||||
|
|
||||||
|
// Set up queue hierarchy.
|
||||||
|
Map <String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||||
|
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
|
||||||
|
csConf, null, "root", queues, queues, TestUtils.spyHook);
|
||||||
|
rootQueue.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
// Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that
|
||||||
|
// can be used for AMs is 30%. So, 30% of <memory: 100GB, vCores: 1000> is
|
||||||
|
// <memory: 30GB, vCores: 30>, which is the guaranteed capacity of "queueA".
|
||||||
|
// 30% of that (rounded to the nearest 1GB) is <memory: 9GB, vCores: 9>. The
|
||||||
|
// max AM queue limit should never be less than that for any resource.
|
||||||
|
LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
|
queueA.setUserLimitFactor(10f);
|
||||||
|
// Make sure "queueA" knows the total cluster resource.
|
||||||
|
queueA.updateClusterResource(clusterResource, new ResourceLimits(
|
||||||
|
clusterResource));
|
||||||
|
// Get "queueA"'s guaranteed capacity (<memory: 30GB, vCores: 300>).
|
||||||
|
Resource capacity =
|
||||||
|
Resources.multiply(clusterResource, queueA.getCapacity());
|
||||||
|
// Limit is the actual resources available to "queueA". The following
|
||||||
|
// simulates the case where a second queue ("queueB") has "borrowed" almost
|
||||||
|
// all of "queueA"'s resources because "queueB" has a max capacity of 100%
|
||||||
|
// and has gone well over its guaranteed capacity. In this case, "queueB"
|
||||||
|
// has used 99GB of memory and used 505 vCores. This is to make vCores
|
||||||
|
// dominant in the calculations for the available resources.
|
||||||
|
Resource limit = Resource.newInstance(1024, 495);
|
||||||
|
ResourceLimits currentResourceLimits =
|
||||||
|
new ResourceLimits(limit, Resources.none());
|
||||||
|
// Update queueA's limit again to reflect over-usage by queueB.
|
||||||
|
queueA.updateClusterResource(clusterResource, currentResourceLimits);
|
||||||
|
Resource expectedAmLimit = Resources.multiply(capacity,
|
||||||
|
queueA.getMaxAMResourcePerQueuePercent());
|
||||||
|
Resource amLimit = queueA.calculateAndGetAMResourceLimit();
|
||||||
|
assertTrue("AM memory limit is less than expected: Expected: " +
|
||||||
|
expectedAmLimit.getMemorySize() + "; Computed: "
|
||||||
|
+ amLimit.getMemorySize(),
|
||||||
|
amLimit.getMemorySize() >= expectedAmLimit.getMemorySize());
|
||||||
|
assertTrue("AM vCore limit is less than expected: Expected: " +
|
||||||
|
expectedAmLimit.getVirtualCores() + "; Computed: "
|
||||||
|
+ amLimit.getVirtualCores(),
|
||||||
|
amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue