YARN-9797. LeafQueue#activateApplications should use resourceCalculator#fitsIn. Contributed by Bilwa S T.
(cherry picked from commit 03489124ea
)
This commit is contained in:
parent
63b18d764a
commit
3210d1e993
|
@ -838,8 +838,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
+ " AM node-partition name " + partitionName);
|
+ " AM node-partition name " + partitionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
|
if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
|
||||||
amIfStarted, amLimit)) {
|
|
||||||
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
|
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
|
||||||
resourceCalculator, lastClusterResource,
|
resourceCalculator, lastClusterResource,
|
||||||
queueUsage.getAMUsed(partitionName), Resources.none()))) {
|
queueUsage.getAMUsed(partitionName), Resources.none()))) {
|
||||||
|
@ -874,8 +873,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
application.getAMResource(partitionName),
|
application.getAMResource(partitionName),
|
||||||
user.getConsumedAMResources(partitionName));
|
user.getConsumedAMResources(partitionName));
|
||||||
|
|
||||||
if (!Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
|
if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
|
||||||
userAmIfStarted, userAMLimit)) {
|
|
||||||
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
|
if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
|
||||||
resourceCalculator, lastClusterResource,
|
resourceCalculator, lastClusterResource,
|
||||||
queueUsage.getAMUsed(partitionName), Resources.none()))) {
|
queueUsage.getAMUsed(partitionName), Resources.none()))) {
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
|
@ -58,7 +60,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -747,4 +751,85 @@ public class TestApplicationLimitsByPartition {
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@link LeafQueue#activateApplications()} should validate values of all
|
||||||
|
* resourceTypes before activating application.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testAMLimitByAllResources() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration csconf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||||
|
String queueName = "a1";
|
||||||
|
csconf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {queueName});
|
||||||
|
csconf.setCapacity("root." + queueName, 100);
|
||||||
|
|
||||||
|
ResourceInformation res0 = ResourceInformation.newInstance("memory-mb",
|
||||||
|
ResourceInformation.MEMORY_MB.getUnits(), GB, Long.MAX_VALUE);
|
||||||
|
ResourceInformation res1 = ResourceInformation.newInstance("vcores",
|
||||||
|
ResourceInformation.VCORES.getUnits(), 1, Integer.MAX_VALUE);
|
||||||
|
ResourceInformation res2 = ResourceInformation.newInstance("gpu",
|
||||||
|
ResourceInformation.GPUS.getUnits(), 0, Integer.MAX_VALUE);
|
||||||
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
|
riMap.put(ResourceInformation.MEMORY_URI, res0);
|
||||||
|
riMap.put(ResourceInformation.VCORES_URI, res1);
|
||||||
|
riMap.put(ResourceInformation.GPU_URI, res2);
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
YarnConfiguration config = new YarnConfiguration(csconf);
|
||||||
|
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
config.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
|
||||||
|
false);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(config);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
Map<String, Long> res = new HashMap<>();
|
||||||
|
res.put("gpu", 0L);
|
||||||
|
|
||||||
|
Resource clusterResource = Resource.newInstance(16 * GB, 64, res);
|
||||||
|
|
||||||
|
// Cluster Resource - 16GB, 64vcores
|
||||||
|
// AMLimit 16384 x .1 mb , 64 x .1 vcore
|
||||||
|
// Effective AM limit after normalized to minimum resource 2048,7
|
||||||
|
|
||||||
|
rm.registerNode("127.0.0.1:1234", clusterResource);
|
||||||
|
|
||||||
|
String userName = "user_0";
|
||||||
|
ResourceScheduler scheduler = rm.getRMContext().getScheduler();
|
||||||
|
LeafQueue queueA = (LeafQueue) ((CapacityScheduler) scheduler)
|
||||||
|
.getQueue(queueName);
|
||||||
|
|
||||||
|
Resource amResource = Resource.newInstance(GB, 1);
|
||||||
|
|
||||||
|
rm.submitApp(amResource, "app-1", userName, null, queueName);
|
||||||
|
rm.submitApp(amResource, "app-2", userName, null, queueName);
|
||||||
|
|
||||||
|
// app-3 should not be activated as amLimit will be reached
|
||||||
|
// for memory
|
||||||
|
rm.submitApp(amResource, "app-3", userName, null, queueName);
|
||||||
|
|
||||||
|
Assert.assertEquals("PendingApplications should be 1", 1,
|
||||||
|
queueA.getNumPendingApplications());
|
||||||
|
Assert.assertEquals("Active applications should be 2", 2,
|
||||||
|
queueA.getNumActiveApplications());
|
||||||
|
// AMLimit is 2048,7
|
||||||
|
Assert.assertEquals(2048,
|
||||||
|
queueA.getQueueResourceUsage().getAMLimit().getMemorySize());
|
||||||
|
Assert.assertEquals(7,
|
||||||
|
queueA.getQueueResourceUsage().getAMLimit().getVirtualCores());
|
||||||
|
// Used AM Resource is 2048,2
|
||||||
|
Assert.assertEquals(2048,
|
||||||
|
queueA.getQueueResourceUsage().getAMUsed().getMemorySize());
|
||||||
|
Assert.assertEquals(2,
|
||||||
|
queueA.getQueueResourceUsage().getAMUsed().getVirtualCores());
|
||||||
|
|
||||||
|
rm.close();
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue