YARN-7619. Max AM Resource value in Capacity Scheduler UI has to be refreshed for every user. Contributed by Eric Payne.

This commit is contained in:
Sunil G 2018-01-05 14:42:17 +05:30
parent dc735b286b
commit 0c75d0634b
7 changed files with 86 additions and 18 deletions

View File

@ -64,7 +64,7 @@ public class AbstractResourceUsage {
// be written by ordering policies
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
9), EFF_MAX_RESOURCE(10);
9), EFF_MAX_RESOURCE(10), USERAMLIMIT(11);
private int idx;

View File

@ -268,6 +268,22 @@ public class ResourceUsage extends AbstractResourceUsage {
_set(label, ResourceType.AMLIMIT, res);
}
public Resource getUserAMLimit() {
return getAMLimit(NL);
}
public Resource getUserAMLimit(String label) {
return _get(label, ResourceType.USERAMLIMIT);
}
public void setUserAMLimit(Resource res) {
setAMLimit(NL, res);
}
public void setUserAMLimit(String label, Resource res) {
_set(label, ResourceType.USERAMLIMIT, res);
}
public Resource getCachedDemand(String label) {
try {
readLock.lock();

View File

@ -703,6 +703,7 @@ public class LeafQueue extends AbstractCSQueue {
*/
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
float preWeightedUserLimit = effectiveUserLimit;
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
@ -712,10 +713,28 @@ public class LeafQueue extends AbstractCSQueue {
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
userAMLimit :
getAMResourceLimitPerPartition(nodePartition);
userAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
userAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* preWeightedUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
preWeighteduserAMLimit =
Resources.min(resourceCalculator, lastClusterResource,
preWeighteduserAMLimit,
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Effective user AM limit for \"" + userName + "\":" +
preWeighteduserAMLimit + ". " + "Effective weighted user AM limit: "
+ userAMLimit + ". User weight: " + userWeight);
}
return userAMLimit;
} finally {
readLock.unlock();
}

View File

@ -144,13 +144,13 @@ class CapacitySchedulerPage extends RmView {
// Get UserInfo from first user to calculate AM Resource Limit per user.
ResourceInfo userAMResourceLimit = null;
ArrayList<UserInfo> usersList = lqinfo.getUsers().getUsersList();
if (usersList.isEmpty()) {
// If no users are present, consider AM Limit for that queue.
if (!usersList.isEmpty()) {
userAMResourceLimit = resourceUsages.getUserAmLimit();
}
// If no users are present or if AM limit per user doesn't exist, retrieve
// AM Limit for that queue.
if (userAMResourceLimit == null) {
userAMResourceLimit = resourceUsages.getAMLimit();
} else {
userAMResourceLimit = usersList.get(0)
.getResourceUsageInfo().getPartitionResourceUsageInfo(label)
.getAMLimit();
}
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
? new ResourceInfo(Resources.none())
@ -235,11 +235,25 @@ class CapacitySchedulerPage extends RmView {
.$class("ui-state-default").__("Non-Schedulable Apps").__().__().__()
.tbody();
PartitionResourcesInfo queueUsageResources =
lqinfo.getResources().getPartitionResourceUsageInfo(
nodeLabel == null ? "" : nodeLabel);
ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
for (UserInfo userInfo : users) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
PartitionResourcesInfo resourceUsages = userInfo.getResourceUsageInfo()
.getPartitionResourceUsageInfo((nodeLabel == null) ? "" : nodeLabel);
ResourceInfo userAMLimitPerPartition =
queueUsageResources.getUserAmLimit();
// If AM limit per user is null, use the AM limit for the queue level.
if (userAMLimitPerPartition == null) {
userAMLimitPerPartition = queueUsageResources.getAMLimit();
}
if (userInfo.getUserWeight() != 1.0) {
userAMLimitPerPartition =
new ResourceInfo(
Resources.multiply(userAMLimitPerPartition.getResource(),
userInfo.getUserWeight()));
}
if (nodeLabel != null) {
resourcesUsed = userInfo.getResourceUsageInfo()
.getPartitionResourceUsageInfo(nodeLabel).getUsed();
@ -254,7 +268,7 @@ class CapacitySchedulerPage extends RmView {
.td(userInfo.getUserResourceLimit().toString())
.td(String.valueOf(userInfo.getUserWeight()))
.td(resourcesUsed.toString())
.td(resourceUsages.getAMLimit().toString())
.td(userAMLimitPerPartition.toString())
.td(amUsed.toString())
.td(Integer.toString(userInfo.getNumActiveApplications()))
.td(Integer.toString(userInfo.getNumPendingApplications())).__();

View File

@ -33,13 +33,15 @@ public class PartitionResourcesInfo {
private ResourceInfo pending;
private ResourceInfo amUsed;
private ResourceInfo amLimit = new ResourceInfo();
private ResourceInfo userAmLimit;
public PartitionResourcesInfo() {
}
public PartitionResourcesInfo(String partitionName, ResourceInfo used,
ResourceInfo reserved, ResourceInfo pending,
ResourceInfo amResourceUsed, ResourceInfo amResourceLimit) {
ResourceInfo amResourceUsed, ResourceInfo amResourceLimit,
ResourceInfo perUserAmResourceLimit) {
super();
this.partitionName = partitionName;
this.used = used;
@ -47,6 +49,7 @@ public class PartitionResourcesInfo {
this.pending = pending;
this.amUsed = amResourceUsed;
this.amLimit = amResourceLimit;
this.userAmLimit = perUserAmResourceLimit;
}
public String getPartitionName() {
@ -96,4 +99,18 @@ public class PartitionResourcesInfo {
public void setAMLimit(ResourceInfo amLimit) {
this.amLimit = amLimit;
}
/**
* @return the userAmLimit
*/
public ResourceInfo getUserAmLimit() {
return userAmLimit;
}
/**
* @param userAmLimit the userAmLimit to set
*/
public void setUserAmLimit(ResourceInfo userAmLimit) {
this.userAmLimit = userAmLimit;
}
}

View File

@ -51,7 +51,9 @@ public class ResourcesInfo {
considerAMUsage ? new ResourceInfo(resourceUsage
.getAMUsed(partitionName)) : null,
considerAMUsage ? new ResourceInfo(resourceUsage
.getAMLimit(partitionName)) : null));
.getAMLimit(partitionName)) : null,
considerAMUsage ? new ResourceInfo(resourceUsage
.getUserAMLimit(partitionName)) : null));
}
}

View File

@ -508,13 +508,13 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
partitionInfo = partitionsCapsArray.getJSONObject(0);
partitionName = partitionInfo.getString("partitionName");
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
assertEquals("incorrect number of elements", 6,
assertEquals("incorrect number of elements", 7,
partitionsResourcesArray.getJSONObject(0).length());
break;
case QUEUE_B:
assertEquals("Invalid default Label expression", LABEL_LX,
queueJson.getString("defaultNodeLabelExpression"));
assertEquals("incorrect number of elements", 6,
assertEquals("incorrect number of elements", 7,
partitionsResourcesArray.getJSONObject(0).length());
verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
assertEquals("incorrect number of partitions", 2,