YARN-10869. CS considers only the default maximum-allocation-mb/vcore property as a maximum when it creates dynamic queues (#3504)
Co-authored-by: Benjamin Teke <bteke@cloudera.com>
This commit is contained in:
parent
9e2936f8d1
commit
700045896c
|
@ -100,7 +100,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
String defaultLabelExpression;
|
String defaultLabelExpression;
|
||||||
private String multiNodeSortingPolicyName = null;
|
private String multiNodeSortingPolicyName = null;
|
||||||
|
|
||||||
Map<AccessType, AccessControlList> acls =
|
Map<AccessType, AccessControlList> acls =
|
||||||
new HashMap<AccessType, AccessControlList>();
|
new HashMap<AccessType, AccessControlList>();
|
||||||
volatile boolean reservationsContinueLooking;
|
volatile boolean reservationsContinueLooking;
|
||||||
private volatile boolean preemptionDisabled;
|
private volatile boolean preemptionDisabled;
|
||||||
|
@ -112,7 +112,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
volatile ResourceUsage queueUsage;
|
volatile ResourceUsage queueUsage;
|
||||||
|
|
||||||
private final boolean fullPathQueueNamingPolicy = false;
|
private final boolean fullPathQueueNamingPolicy = false;
|
||||||
|
|
||||||
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
|
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
|
||||||
// etc.
|
// etc.
|
||||||
QueueCapacities queueCapacities;
|
QueueCapacities queueCapacities;
|
||||||
|
@ -134,7 +134,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
protected CapacityConfigType capacityConfigType =
|
protected CapacityConfigType capacityConfigType =
|
||||||
CapacityConfigType.NONE;
|
CapacityConfigType.NONE;
|
||||||
|
|
||||||
private final RecordFactory recordFactory =
|
private final RecordFactory recordFactory =
|
||||||
RecordFactoryProvider.getRecordFactory(null);
|
RecordFactoryProvider.getRecordFactory(null);
|
||||||
protected CapacitySchedulerContext csContext;
|
protected CapacitySchedulerContext csContext;
|
||||||
protected YarnAuthorizationProvider authorizer = null;
|
protected YarnAuthorizationProvider authorizer = null;
|
||||||
|
@ -250,12 +250,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public QueueState getState() {
|
public QueueState getState() {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CSQueueMetrics getMetrics() {
|
public CSQueueMetrics getMetrics() {
|
||||||
return metrics;
|
return metrics;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getQueueShortName() {
|
public String getQueueShortName() {
|
||||||
return queueName;
|
return queueName;
|
||||||
|
@ -283,7 +283,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public void setParent(CSQueue newParentQueue) {
|
public void setParent(CSQueue newParentQueue) {
|
||||||
this.parent = newParentQueue;
|
this.parent = newParentQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<String> getAccessibleNodeLabels() {
|
public Set<String> getAccessibleNodeLabels() {
|
||||||
return accessibleLabels;
|
return accessibleLabels;
|
||||||
}
|
}
|
||||||
|
@ -344,7 +344,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public String getDefaultNodeLabelExpression() {
|
public String getDefaultNodeLabelExpression() {
|
||||||
return defaultLabelExpression;
|
return defaultLabelExpression;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setupQueueConfigs(Resource clusterResource)
|
void setupQueueConfigs(Resource clusterResource)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||||
|
@ -471,8 +471,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
|
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
|
||||||
String myQueuePath = getQueuePath();
|
String myQueuePath = getQueuePath();
|
||||||
|
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
|
||||||
|
* object is a cloned one containing only the template configs
|
||||||
|
* (see ManagedParentQueue#getLeafQueueConfigs). To ensure that the actual
|
||||||
|
* cluster maximum allocation is fetched the original config object should
|
||||||
|
* be used.
|
||||||
|
*/
|
||||||
Resource clusterMax = ResourceUtils
|
Resource clusterMax = ResourceUtils
|
||||||
.fetchMaximumAllocationFromConfig(csConf);
|
.fetchMaximumAllocationFromConfig(this.csContext.getConfiguration());
|
||||||
Resource queueMax = csConf.getQueueMaximumAllocation(myQueuePath);
|
Resource queueMax = csConf.getQueueMaximumAllocation(myQueuePath);
|
||||||
|
|
||||||
maximumAllocation = Resources.clone(
|
maximumAllocation = Resources.clone(
|
||||||
|
@ -740,7 +746,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
stats.setReservedContainers(getMetrics().getReservedContainers());
|
stats.setReservedContainers(getMetrics().getReservedContainers());
|
||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, QueueConfigurations> getQueueConfigurations() {
|
public Map<String, QueueConfigurations> getQueueConfigurations() {
|
||||||
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
|
Map<String, QueueConfigurations> queueConfigurations = new HashMap<>();
|
||||||
Set<String> nodeLabels = getNodeLabelsForQueue();
|
Set<String> nodeLabels = getNodeLabelsForQueue();
|
||||||
|
@ -776,12 +782,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public Resource getMaximumAllocation() {
|
public Resource getMaximumAllocation() {
|
||||||
return maximumAllocation;
|
return maximumAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public Resource getMinimumAllocation() {
|
public Resource getMinimumAllocation() {
|
||||||
return minimumAllocation;
|
return minimumAllocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
void allocateResource(Resource clusterResource,
|
void allocateResource(Resource clusterResource,
|
||||||
Resource resource, String nodePartition) {
|
Resource resource, String nodePartition) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -796,7 +802,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void releaseResource(Resource clusterResource,
|
protected void releaseResource(Resource clusterResource,
|
||||||
Resource resource, String nodePartition) {
|
Resource resource, String nodePartition) {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
@ -811,12 +817,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public boolean getReservationContinueLooking() {
|
public boolean getReservationContinueLooking() {
|
||||||
return reservationsContinueLooking;
|
return reservationsContinueLooking;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public Map<AccessType, AccessControlList> getACLs() {
|
public Map<AccessType, AccessControlList> getACLs() {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
|
@ -841,12 +847,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
|
public boolean getIntraQueuePreemptionDisabledInHierarchy() {
|
||||||
return intraQueuePreemptionDisabledInHierarchy;
|
return intraQueuePreemptionDisabledInHierarchy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public QueueCapacities getQueueCapacities() {
|
public QueueCapacities getQueueCapacities() {
|
||||||
return queueCapacities;
|
return queueCapacities;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public ResourceUsage getQueueResourceUsage() {
|
public ResourceUsage getQueueResourceUsage() {
|
||||||
return queueUsage;
|
return queueUsage;
|
||||||
|
@ -1018,7 +1024,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
// all queues on this label equals to total resource with the label.
|
// all queues on this label equals to total resource with the label.
|
||||||
return labelManager.getResourceByLabel(nodePartition, clusterResource);
|
return labelManager.getResourceByLabel(nodePartition, clusterResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Resources.none();
|
return Resources.none();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1159,7 +1165,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
parent.incPendingResource(nodeLabel, resourceToInc);
|
parent.incPendingResource(nodeLabel, resourceToInc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
|
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
|
||||||
if (nodeLabel == null) {
|
if (nodeLabel == null) {
|
||||||
|
@ -1171,7 +1177,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
parent.decPendingResource(nodeLabel, resourceToDec);
|
parent.decPendingResource(nodeLabel, resourceToDec);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
public void incUsedResource(String nodeLabel, Resource resourceToInc,
|
||||||
SchedulerApplicationAttempt application) {
|
SchedulerApplicationAttempt application) {
|
||||||
|
@ -1206,14 +1212,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return if the queue has pending resource on given nodePartition and
|
* Return if the queue has pending resource on given nodePartition and
|
||||||
* schedulingMode.
|
* schedulingMode.
|
||||||
*/
|
*/
|
||||||
boolean hasPendingResourceRequest(String nodePartition,
|
boolean hasPendingResourceRequest(String nodePartition,
|
||||||
Resource cluster, SchedulingMode schedulingMode) {
|
Resource cluster, SchedulingMode schedulingMode) {
|
||||||
return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
|
return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
|
||||||
queueUsage, nodePartition, cluster, schedulingMode);
|
queueUsage, nodePartition, cluster, schedulingMode);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean accessibleToPartition(String nodePartition) {
|
public boolean accessibleToPartition(String nodePartition) {
|
||||||
// if queue's label is *, it can access any node
|
// if queue's label is *, it can access any node
|
||||||
if (accessibleLabels != null
|
if (accessibleLabels != null
|
||||||
|
|
|
@ -953,6 +953,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQueueMaximumAllocation(String queue, String maximumAllocation) {
|
||||||
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
|
set(queuePrefix + MAXIMUM_ALLOCATION, maximumAllocation);
|
||||||
|
}
|
||||||
|
|
||||||
public long getQueueMaximumAllocationMb(String queue) {
|
public long getQueueMaximumAllocationMb(String queue) {
|
||||||
String queuePrefix = getQueuePrefix(queue);
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
|
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
|
||||||
|
@ -2061,6 +2066,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
|
setDefaultNodeLabelExpression(leafQueueConfPrefix, expression);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setAutoCreatedLeafQueueConfigMaximumAllocation(String
|
||||||
|
queuePath, String expression) {
|
||||||
|
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
|
||||||
|
queuePath);
|
||||||
|
setQueueMaximumAllocation(leafQueueConfPrefix, expression);
|
||||||
|
}
|
||||||
|
|
||||||
public static String getUnits(String resourceValue) {
|
public static String getUnits(String resourceValue) {
|
||||||
String units;
|
String units;
|
||||||
for (int i = 0; i < resourceValue.length(); i++) {
|
for (int i = 0; i < resourceValue.length(); i++) {
|
||||||
|
|
|
@ -385,6 +385,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
|
conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f);
|
||||||
conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
|
conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100);
|
||||||
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
|
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
|
||||||
|
conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f);
|
||||||
|
conf.setAutoCreatedLeafQueueConfigMaximumAllocation(C,
|
||||||
|
"memory-mb=10240,vcores=6");
|
||||||
|
|
||||||
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU,
|
conf.setAutoCreatedLeafQueueTemplateCapacityByLabel(C, NODEL_LABEL_GPU,
|
||||||
NODE_LABEL_GPU_TEMPLATE_CAPACITY);
|
NODE_LABEL_GPU_TEMPLATE_CAPACITY);
|
||||||
|
@ -540,9 +543,31 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
|
schedConf.setInt(YarnConfiguration.RESOURCE_TYPES
|
||||||
+ ".memory-mb.maximum-allocation", 16384);
|
+ ".memory-mb.maximum-allocation", 16384);
|
||||||
|
|
||||||
|
|
||||||
return new CapacitySchedulerConfiguration(schedConf);
|
return new CapacitySchedulerConfiguration(schedConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setSchedulerMinMaxAllocation(CapacitySchedulerConfiguration conf) {
|
||||||
|
unsetMinMaxAllocation(conf);
|
||||||
|
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 8);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024);
|
||||||
|
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 18384);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void unsetMinMaxAllocation(CapacitySchedulerConfiguration conf) {
|
||||||
|
conf.unset(YarnConfiguration.RESOURCE_TYPES
|
||||||
|
+ ".vcores.minimum-allocation");
|
||||||
|
conf.unset(YarnConfiguration.RESOURCE_TYPES
|
||||||
|
+ ".vcores.maximum-allocation");
|
||||||
|
conf.unset(YarnConfiguration.RESOURCE_TYPES
|
||||||
|
+ ".memory-mb.minimum-allocation");
|
||||||
|
conf.unset(YarnConfiguration.RESOURCE_TYPES
|
||||||
|
+ ".memory-mb.maximum-allocation");
|
||||||
|
}
|
||||||
|
|
||||||
protected MockRM setupSchedulerInstance() throws Exception {
|
protected MockRM setupSchedulerInstance() throws Exception {
|
||||||
|
|
||||||
if (mockRM != null) {
|
if (mockRM != null) {
|
||||||
|
@ -640,10 +665,11 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void validateContainerLimits(
|
protected void validateContainerLimits(
|
||||||
AutoCreatedLeafQueue autoCreatedLeafQueue) {
|
AutoCreatedLeafQueue autoCreatedLeafQueue, int vCoreLimit,
|
||||||
assertEquals(8,
|
long memorySize) {
|
||||||
|
assertEquals(vCoreLimit,
|
||||||
autoCreatedLeafQueue.getMaximumAllocation().getVirtualCores());
|
autoCreatedLeafQueue.getMaximumAllocation().getVirtualCores());
|
||||||
assertEquals(16384,
|
assertEquals(memorySize,
|
||||||
autoCreatedLeafQueue.getMaximumAllocation().getMemorySize());
|
autoCreatedLeafQueue.getMaximumAllocation().getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,7 +137,7 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
|
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
|
||||||
|
|
||||||
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
|
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
|
||||||
validateContainerLimits(autoCreatedLeafQueue);
|
validateContainerLimits(autoCreatedLeafQueue, 6, 10240);
|
||||||
|
|
||||||
assertTrue(autoCreatedLeafQueue
|
assertTrue(autoCreatedLeafQueue
|
||||||
.getOrderingPolicy() instanceof FairOrderingPolicy);
|
.getOrderingPolicy() instanceof FairOrderingPolicy);
|
||||||
|
@ -166,6 +166,35 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testAutoCreateLeafQueueCreationSchedulerMaximumAllocation()
|
||||||
|
throws Exception {
|
||||||
|
try {
|
||||||
|
// Check the minimum/maximum allocation settings via the
|
||||||
|
// yarn.scheduler.minimum/maximum-allocation-mb/vcore property
|
||||||
|
setSchedulerMinMaxAllocation(cs.getConfiguration());
|
||||||
|
cs.getConfiguration().setAutoCreatedLeafQueueConfigMaximumAllocation(C,
|
||||||
|
"memory-mb=18384,vcores=8");
|
||||||
|
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
|
||||||
|
|
||||||
|
// submit an app
|
||||||
|
submitApp(mockRM, cs.getQueue(PARENT_QUEUE), USER0, USER0, 1, 1);
|
||||||
|
|
||||||
|
// check preconditions
|
||||||
|
List<ApplicationAttemptId> appsInC = cs.getAppsInQueue(PARENT_QUEUE);
|
||||||
|
assertEquals(1, appsInC.size());
|
||||||
|
assertNotNull(cs.getQueue(USER0));
|
||||||
|
|
||||||
|
AutoCreatedLeafQueue autoCreatedLeafQueue =
|
||||||
|
(AutoCreatedLeafQueue) cs.getQueue(USER0);
|
||||||
|
|
||||||
|
validateContainerLimits(autoCreatedLeafQueue, 8, 18384);
|
||||||
|
} finally {
|
||||||
|
cleanupQueue(USER0);
|
||||||
|
cleanupQueue(TEST_GROUPUSER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 20000)
|
@Test(timeout = 20000)
|
||||||
public void testAutoCreateLeafQueueCreationUsingFullParentPath()
|
public void testAutoCreateLeafQueueCreationUsingFullParentPath()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
@ -825,7 +854,7 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
|
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
|
||||||
|
|
||||||
validateUserAndAppLimits(user3Queue, 900, 900);
|
validateUserAndAppLimits(user3Queue, 900, 900);
|
||||||
validateContainerLimits(user3Queue);
|
validateContainerLimits(user3Queue, 6, 10240);
|
||||||
|
|
||||||
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
|
||||||
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
|
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue)
|
||||||
|
|
Loading…
Reference in New Issue