YARN-9797. LeafQueue#activateApplications should use resourceCalculator#fitsIn. Contributed by Bilwa S T.

(cherry picked from commit 03489124ea)
This commit is contained in:
bibinchundatt 2019-09-03 11:53:34 +05:30
parent 61fd1a74a1
commit 1e6095f16b
2 changed files with 87 additions and 4 deletions

View File

@ -834,8 +834,7 @@ public class LeafQueue extends AbstractCSQueue {
+ " AM node-partition name " + partitionName);
}
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
amIfStarted, amLimit)) {
if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
queueUsage.getAMUsed(partitionName), Resources.none()))) {
@ -870,8 +869,7 @@ public class LeafQueue extends AbstractCSQueue {
application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAmIfStarted, userAMLimit)) {
if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
resourceCalculator, lastClusterResource,
queueUsage.getAMUsed(partitionName), Resources.none()))) {

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
@ -58,7 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
@ -747,4 +751,85 @@ public class TestApplicationLimitsByPartition {
}
/**
* {@link LeafQueue#activateApplications()} should validate values of all
* resourceTypes before activating application.
*
* @throws Exception
*/
@Test
public void testAMLimitByAllResources() throws Exception {
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
String queueName = "a1";
csconf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {queueName});
csconf.setCapacity("root." + queueName, 100);
ResourceInformation res0 = ResourceInformation.newInstance("memory-mb",
ResourceInformation.MEMORY_MB.getUnits(), GB, Long.MAX_VALUE);
ResourceInformation res1 = ResourceInformation.newInstance("vcores",
ResourceInformation.VCORES.getUnits(), 1, Integer.MAX_VALUE);
ResourceInformation res2 = ResourceInformation.newInstance("gpu",
ResourceInformation.GPUS.getUnits(), 0, Integer.MAX_VALUE);
Map<String, ResourceInformation> riMap = new HashMap<>();
riMap.put(ResourceInformation.MEMORY_URI, res0);
riMap.put(ResourceInformation.VCORES_URI, res1);
riMap.put(ResourceInformation.GPU_URI, res2);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
YarnConfiguration config = new YarnConfiguration(csconf);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
config.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
false);
MockRM rm = new MockRM(config);
rm.start();
Map<String, Long> res = new HashMap<>();
res.put("gpu", 0L);
Resource clusterResource = Resource.newInstance(16 * GB, 64, res);
// Cluster Resource - 16GB, 64vcores
// AMLimit 16384 x .1 mb , 64 x .1 vcore
// Effective AM limit after normalized to minimum resource 2048,7
rm.registerNode("127.0.0.1:1234", clusterResource);
String userName = "user_0";
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
LeafQueue queueA = (LeafQueue) ((CapacityScheduler) scheduler)
.getQueue(queueName);
Resource amResource = Resource.newInstance(GB, 1);
rm.submitApp(amResource, "app-1", userName, null, queueName);
rm.submitApp(amResource, "app-2", userName, null, queueName);
// app-3 should not be activated as amLimit will be reached
// for memory
rm.submitApp(amResource, "app-3", userName, null, queueName);
Assert.assertEquals("PendingApplications should be 1", 1,
queueA.getNumPendingApplications());
Assert.assertEquals("Active applications should be 2", 2,
queueA.getNumActiveApplications());
// AMLimit is 2048,7
Assert.assertEquals(2048,
queueA.getQueueResourceUsage().getAMLimit().getMemorySize());
Assert.assertEquals(7,
queueA.getQueueResourceUsage().getAMLimit().getVirtualCores());
// Used AM Resource is 2048,2
Assert.assertEquals(2048,
queueA.getQueueResourceUsage().getAMUsed().getMemorySize());
Assert.assertEquals(2,
queueA.getQueueResourceUsage().getAMUsed().getVirtualCores());
rm.close();
}
}