YARN-10949. Simplify AbstractCSQueue#updateMaxAppRelatedField and find a more meaningful name for this method. Contributed by Andras Gyori

This commit is contained in:
Andras Gyori 2021-10-20 12:56:41 +02:00 committed by GitHub
parent 414d40155c
commit 35b8441fd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 91 additions and 76 deletions

View File

@ -1462,53 +1462,6 @@ private Resource getOrInheritMaxResource(Resource resourceByLabel, String label)
configuredMaxResource, parentMaxResource));
}
void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
LeafQueue leafQueue) {
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
String maxLabel = RMNodeLabelsManager.NO_LABEL;
if (maxApplications < 0) {
for (String label : configuredNodeLabels) {
int maxApplicationsByLabel = 0;
if (maxGlobalPerQueueApps > 0) {
// In absolute mode, should
// shrink when change to corresponding label capacity.
maxApplicationsByLabel = this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE ?
maxGlobalPerQueueApps :
(int) (maxGlobalPerQueueApps * queueCapacities
.getAbsoluteCapacity(label));
} else {
maxApplicationsByLabel = (int) (conf.getMaximumSystemApplications()
* queueCapacities.getAbsoluteCapacity(label));
}
if (maxApplicationsByLabel > maxApplications) {
maxApplications = maxApplicationsByLabel;
maxLabel = label;
}
}
}
leafQueue.setMaxApplications(maxApplications);
int maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
* leafQueue.getUsersManager().getUserLimitFactor()));
if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
maxApplicationsPerUser = maxApplications;
}
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
LOG.info("LeafQueue:" + leafQueue.getQueuePath() +
"update max app related, maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}
void deriveCapacityFromAbsoluteConfigurations(String label,
Resource clusterResource) {
// Update capacity with a float calculated from the parent's minResources

View File

@ -461,6 +461,10 @@ private String getNodeLabelPrefix(String queue, String label) {
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
}
public void setMaximumSystemApplications(int numMaxApps) {
setInt(MAXIMUM_SYSTEM_APPLICATIONS, numMaxApps);
}
public int getMaximumSystemApplications() {
int maxApplications =
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);

View File

@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -1939,8 +1938,9 @@ public void updateClusterResource(Resource clusterResource,
updateAbsoluteCapacities();
super.updateEffectiveResources(clusterResource);
super.updateMaxAppRelatedField(csContext.getConfiguration(),
this);
// Update maximum applications for the queue and for users
updateMaximumApplications(csContext.getConfiguration());
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
@ -2326,6 +2326,58 @@ public void stopQueue() {
}
}
void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());
int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
int maxSystemApps = conf.getMaximumSystemApplications();
int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
Math.min(maxDefaultPerQueueApps, maxSystemApps)
: maxSystemApps;
String maxLabel = RMNodeLabelsManager.NO_LABEL;
if (maxAppsForQueue < 0) {
if (maxDefaultPerQueueApps > 0 && this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE) {
maxAppsForQueue = baseMaxApplications;
} else {
for (String label : configuredNodeLabels) {
int maxApplicationsByLabel = (int) (baseMaxApplications
* queueCapacities.getAbsoluteCapacity(label));
if (maxApplicationsByLabel > maxAppsForQueue) {
maxAppsForQueue = maxApplicationsByLabel;
maxLabel = label;
}
}
}
}
setMaxApplications(maxAppsForQueue);
updateMaxAppsPerUser();
LOG.info("LeafQueue:" + getQueuePath() +
"update max app related, maxApplications="
+ maxAppsForQueue + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}
private void updateMaxAppsPerUser() {
int maxAppsPerUser = maxApplications;
if (getUsersManager().getUserLimitFactor() != -1) {
int maxApplicationsWithUserLimits = (int) (maxApplications
* (getUsersManager().getUserLimit() / 100.0f)
* getUsersManager().getUserLimitFactor());
maxAppsPerUser = Math.min(maxApplications,
maxApplicationsWithUserLimits);
}
setMaxApplicationsPerUser(maxAppsPerUser);
}
/**
* Get all valid users in this queue.
* @return user list

View File

@ -870,29 +870,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
+ "submission of application: " + app3.getApplicationId(),
app3.getDiagnostics().toString());
// based on Global limit of queue usert application is rejected
RMApp app11 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("d")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app11.getState());
RMApp app12 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("d")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app12.getState());
// based on per user max app settings, app should be rejected instantly
RMApp app13 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
@ -906,10 +884,32 @@ public RMNodeLabelsManager createNodeLabelManager() {
assertEquals(RMAppState.FAILED, app13.getState());
assertEquals(
"org.apache.hadoop.security.AccessControlException: Queue"
+ " root.d already has 2 applications from user user cannot"
+ " root.d already has 0 applications from user user cannot"
+ " accept submission of application: " + app13.getApplicationId(),
app13.getDiagnostics().toString());
RMApp app11 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user2")
.withAcls(null)
.withQueue("a2")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app11.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app11.getState());
RMApp app12 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("app")
.withUser("user2")
.withAcls(null)
.withQueue("a2")
.withWaitForAppAcceptedState(false)
.build());
rm.drainEvents();
rm.waitForState(app12.getApplicationId(), RMAppState.ACCEPTED);
assertEquals(RMAppState.ACCEPTED, app12.getState());
// based on system max limit application is rejected
RMApp app14 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
@ -938,7 +938,6 @@ public RMNodeLabelsManager createNodeLabelManager() {
app15.getDiagnostics().toString());
rm.killApp(app2.getApplicationId());
rm.killApp(app11.getApplicationId());
rm.killApp(app13.getApplicationId());
rm.killApp(app14.getApplicationId());
rm.stop();

View File

@ -5137,6 +5137,13 @@ public void testSetupQueueConfigsWithSpecifiedConfiguration()
assertEquals(1.0, leafQueue.getAbsoluteMaximumCapacity(),
EPSILON);
// limit maximum apps by max system apps
csConf.setMaximumSystemApplications(15);
leafQueue.updateClusterResource(Resource.newInstance(0, 0),
new ResourceLimits(Resource.newInstance(0, 0)));
assertEquals(15, leafQueue.getMaxApplications());
} finally {
//revert config changes
csConf.setNodeLocalityDelay(

View File

@ -1137,7 +1137,7 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
// Set GlobalMaximumApplicationsPerQueue in csConf
csConf.setGlobalMaximumApplicationsPerQueue(20000);
csConf.setGlobalMaximumApplicationsPerQueue(8000);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));