YARN-5892. Support user-specific minimum user limit percentage in Capacity Scheduler. Contributed by Eric Payne.
This commit is contained in:
parent
68dc7c2405
commit
ca13b224b2
|
@ -66,6 +66,12 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
||||||
divideAndCeil(numerator.getMemorySize(), denominator));
|
divideAndCeil(numerator.getMemorySize(), denominator));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource divideAndCeil(Resource numerator, float denominator) {
|
||||||
|
return Resources.createResource(
|
||||||
|
divideAndCeil(numerator.getMemorySize(), denominator));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource normalize(Resource r, Resource minimumResource,
|
public Resource normalize(Resource r, Resource minimumResource,
|
||||||
Resource maximumResource, Resource stepFactor) {
|
Resource maximumResource, Resource stepFactor) {
|
||||||
|
|
|
@ -154,6 +154,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource divideAndCeil(Resource numerator, float denominator) {
|
||||||
|
return Resources.createResource(
|
||||||
|
divideAndCeil(numerator.getMemorySize(), denominator),
|
||||||
|
divideAndCeil(numerator.getVirtualCores(), denominator)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Resource normalize(Resource r, Resource minimumResource,
|
public Resource normalize(Resource r, Resource minimumResource,
|
||||||
Resource maximumResource, Resource stepFactor) {
|
Resource maximumResource, Resource stepFactor) {
|
||||||
|
|
|
@ -65,6 +65,13 @@ public abstract class ResourceCalculator {
|
||||||
}
|
}
|
||||||
return (a + (b - 1)) / b;
|
return (a + (b - 1)) / b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static int divideAndCeil(int a, float b) {
|
||||||
|
if (b == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return (int) Math.ceil(a / b);
|
||||||
|
}
|
||||||
|
|
||||||
public static long divideAndCeil(long a, long b) {
|
public static long divideAndCeil(long a, long b) {
|
||||||
if (b == 0) {
|
if (b == 0) {
|
||||||
|
@ -73,6 +80,13 @@ public abstract class ResourceCalculator {
|
||||||
return (a + (b - 1)) / b;
|
return (a + (b - 1)) / b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static long divideAndCeil(long a, float b) {
|
||||||
|
if (b == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return (long) Math.ceil(a/b);
|
||||||
|
}
|
||||||
|
|
||||||
public static int roundUp(int a, int b) {
|
public static int roundUp(int a, int b) {
|
||||||
return divideAndCeil(a, b) * b;
|
return divideAndCeil(a, b) * b;
|
||||||
}
|
}
|
||||||
|
@ -198,6 +212,15 @@ public abstract class ResourceCalculator {
|
||||||
* @return resultant resource
|
* @return resultant resource
|
||||||
*/
|
*/
|
||||||
public abstract Resource divideAndCeil(Resource numerator, int denominator);
|
public abstract Resource divideAndCeil(Resource numerator, int denominator);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Divide-and-ceil <code>numerator</code> by <code>denominator</code>.
|
||||||
|
*
|
||||||
|
* @param numerator numerator resource
|
||||||
|
* @param denominator denominator
|
||||||
|
* @return resultant resource
|
||||||
|
*/
|
||||||
|
public abstract Resource divideAndCeil(Resource numerator, float denominator);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a smaller resource can be contained by bigger resource.
|
* Check if a smaller resource can be contained by bigger resource.
|
||||||
|
|
|
@ -279,6 +279,11 @@ public class Resources {
|
||||||
ResourceCalculator resourceCalculator, Resource lhs, int rhs) {
|
ResourceCalculator resourceCalculator, Resource lhs, int rhs) {
|
||||||
return resourceCalculator.divideAndCeil(lhs, rhs);
|
return resourceCalculator.divideAndCeil(lhs, rhs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Resource divideAndCeil(
|
||||||
|
ResourceCalculator resourceCalculator, Resource lhs, float rhs) {
|
||||||
|
return resourceCalculator.divideAndCeil(lhs, rhs);
|
||||||
|
}
|
||||||
|
|
||||||
public static boolean equals(Resource lhs, Resource rhs) {
|
public static boolean equals(Resource lhs, Resource rhs) {
|
||||||
return lhs.equals(rhs);
|
return lhs.equals(rhs);
|
||||||
|
|
|
@ -111,6 +111,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
protected ReentrantReadWriteLock.WriteLock writeLock;
|
protected ReentrantReadWriteLock.WriteLock writeLock;
|
||||||
|
|
||||||
volatile Priority priority = Priority.newInstance(0);
|
volatile Priority priority = Priority.newInstance(0);
|
||||||
|
private Map<String, Float> userWeights = new HashMap<String, Float>();
|
||||||
|
|
||||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||||
|
@ -332,11 +333,28 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
|
|
||||||
this.priority = csContext.getConfiguration().getQueuePriority(
|
this.priority = csContext.getConfiguration().getQueuePriority(
|
||||||
getQueuePath());
|
getQueuePath());
|
||||||
|
|
||||||
|
this.userWeights = getUserWeightsFromHierarchy();
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Map<String, Float> getUserWeightsFromHierarchy() throws IOException {
|
||||||
|
Map<String, Float> unionInheritedWeights = new HashMap<String, Float>();
|
||||||
|
CSQueue parentQ = getParent();
|
||||||
|
if (parentQ != null) {
|
||||||
|
// Inherit all of parent's user's weights
|
||||||
|
unionInheritedWeights.putAll(parentQ.getUserWeights());
|
||||||
|
}
|
||||||
|
// Insert this queue's user's weights, overriding parent's user's weights if
|
||||||
|
// there is overlap.
|
||||||
|
CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
|
||||||
|
unionInheritedWeights.putAll(
|
||||||
|
csConf.getAllUserWeightsForQueue(getQueuePath()));
|
||||||
|
return unionInheritedWeights;
|
||||||
|
}
|
||||||
|
|
||||||
private void initializeQueueState(QueueState previousState,
|
private void initializeQueueState(QueueState previousState,
|
||||||
QueueState configuredState, QueueState parentState) {
|
QueueState configuredState, QueueState parentState) {
|
||||||
// verify that we can not any value for State other than RUNNING/STOPPED
|
// verify that we can not any value for State other than RUNNING/STOPPED
|
||||||
|
@ -956,4 +974,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
public Priority getPriority() {
|
public Priority getPriority() {
|
||||||
return this.priority;
|
return this.priority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Float> getUserWeights() {
|
||||||
|
return userWeights;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
@ -350,4 +351,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
|
||||||
* @return queue priority
|
* @return queue priority
|
||||||
*/
|
*/
|
||||||
Priority getPriority();
|
Priority getPriority();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a map of usernames and weights
|
||||||
|
* @return map of usernames and corresponding weight
|
||||||
|
*/
|
||||||
|
Map<String, Float> getUserWeights();
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,6 +107,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
@Private
|
@Private
|
||||||
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
|
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String USER_WEIGHT = "weight";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String USER_SETTINGS = "user-settings";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final float DEFAULT_USER_WEIGHT = 1.0f;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final String STATE = "state";
|
public static final String STATE = "state";
|
||||||
|
|
||||||
|
@ -1412,4 +1421,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
|
QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
|
||||||
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
|
UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the weights of all users at this queue level from the configuration.
|
||||||
|
* Used in computing user-specific user limit, relative to other users.
|
||||||
|
* @param queuePath full queue path
|
||||||
|
* @return map of user weights, if they exists. Otherwise, return empty map.
|
||||||
|
*/
|
||||||
|
public Map<String, Float> getAllUserWeightsForQueue(String queuePath) {
|
||||||
|
Map <String, Float> userWeights = new HashMap <String, Float>();
|
||||||
|
String qPathPlusPrefix =
|
||||||
|
getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
|
||||||
|
+ USER_SETTINGS + "\\.";
|
||||||
|
String weightKeyRegex =
|
||||||
|
qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
|
||||||
|
Map<String, String> props = getValByRegex(weightKeyRegex);
|
||||||
|
for (Entry<String, String> e : props.entrySet()) {
|
||||||
|
String userName =
|
||||||
|
e.getKey().replaceFirst(qPathPlusPrefix, "")
|
||||||
|
.replaceFirst("\\." + USER_WEIGHT, "");
|
||||||
|
if (userName != null && !userName.isEmpty()) {
|
||||||
|
userWeights.put(userName, new Float(e.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return userWeights;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
@ -237,6 +238,20 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
defaultAppPriorityPerQueue = Priority.newInstance(
|
defaultAppPriorityPerQueue = Priority.newInstance(
|
||||||
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
||||||
|
|
||||||
|
// Validate leaf queue's user's weights.
|
||||||
|
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
|
||||||
|
for (Entry<String, Float> e : getUserWeights().entrySet()) {
|
||||||
|
float val = e.getValue().floatValue();
|
||||||
|
if (val < 0.0f || val > (100.0f / queueUL)) {
|
||||||
|
throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
|
||||||
|
+ "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
|
||||||
|
100.0f/queueUL + ", the number of concurrent active users in "
|
||||||
|
+ getQueuePath() + ")");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
usersManager.updateUserWeights();
|
||||||
|
|
||||||
LOG.info(
|
LOG.info(
|
||||||
"Initializing " + queueName + "\n" + "capacity = " + queueCapacities
|
"Initializing " + queueName + "\n" + "capacity = " + queueCapacities
|
||||||
.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n"
|
.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n"
|
||||||
|
@ -619,11 +634,16 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public Resource getUserAMResourceLimit() {
|
public Resource getUserAMResourceLimit() {
|
||||||
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
|
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
|
||||||
|
null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Resource getUserAMResourceLimitPerPartition(
|
public Resource getUserAMResourceLimitPerPartition(
|
||||||
String nodePartition) {
|
String nodePartition, String userName) {
|
||||||
|
float userWeight = 1.0f;
|
||||||
|
if (userName != null && getUser(userName) != null) {
|
||||||
|
userWeight = getUser(userName).getWeight();
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
/*
|
/*
|
||||||
|
@ -634,6 +654,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
*/
|
*/
|
||||||
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
|
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
|
||||||
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
|
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
|
||||||
|
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
|
||||||
|
|
||||||
Resource queuePartitionResource = Resources
|
Resource queuePartitionResource = Resources
|
||||||
.multiplyAndNormalizeUp(resourceCalculator,
|
.multiplyAndNormalizeUp(resourceCalculator,
|
||||||
|
@ -774,7 +795,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Verify whether we already calculated user-am-limit for this label.
|
// Verify whether we already calculated user-am-limit for this label.
|
||||||
if (userAMLimit == null) {
|
if (userAMLimit == null) {
|
||||||
userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
|
userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
|
||||||
|
application.getUser());
|
||||||
userAmPartitionLimit.put(partitionName, userAMLimit);
|
userAmPartitionLimit.put(partitionName, userAMLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,11 +37,14 @@ public class UserInfo {
|
||||||
protected ResourceInfo AMResourceUsed;
|
protected ResourceInfo AMResourceUsed;
|
||||||
protected ResourceInfo userResourceLimit;
|
protected ResourceInfo userResourceLimit;
|
||||||
protected ResourcesInfo resources;
|
protected ResourcesInfo resources;
|
||||||
|
private float userWeight;
|
||||||
|
private boolean isActive;
|
||||||
|
|
||||||
UserInfo() {}
|
UserInfo() {}
|
||||||
|
|
||||||
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
|
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
|
||||||
Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) {
|
Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage,
|
||||||
|
float weight, boolean isActive) {
|
||||||
this.username = username;
|
this.username = username;
|
||||||
this.resourcesUsed = new ResourceInfo(resUsed);
|
this.resourcesUsed = new ResourceInfo(resUsed);
|
||||||
this.numActiveApplications = activeApps;
|
this.numActiveApplications = activeApps;
|
||||||
|
@ -49,6 +52,8 @@ public class UserInfo {
|
||||||
this.AMResourceUsed = new ResourceInfo(amResUsed);
|
this.AMResourceUsed = new ResourceInfo(amResUsed);
|
||||||
this.userResourceLimit = new ResourceInfo(resourceLimit);
|
this.userResourceLimit = new ResourceInfo(resourceLimit);
|
||||||
this.resources = new ResourcesInfo(resourceUsage);
|
this.resources = new ResourcesInfo(resourceUsage);
|
||||||
|
this.userWeight = weight;
|
||||||
|
this.isActive = isActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getUsername() {
|
public String getUsername() {
|
||||||
|
@ -78,4 +83,12 @@ public class UserInfo {
|
||||||
public ResourcesInfo getResourceUsageInfo() {
|
public ResourcesInfo getResourceUsageInfo() {
|
||||||
return resources;
|
return resources;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public float getUserWeight() {
|
||||||
|
return userWeight;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getIsActive() {
|
||||||
|
return isActive;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,6 +92,9 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
|
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
|
||||||
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
|
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private float activeUsersTimesWeights = 0.0f;
|
||||||
|
private float allUsersTimesWeights = 0.0f;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* UsageRatios will store the total used resources ratio across all users of
|
* UsageRatios will store the total used resources ratio across all users of
|
||||||
* the queue.
|
* the queue.
|
||||||
|
@ -158,6 +161,7 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
|
|
||||||
private UsageRatios userUsageRatios = new UsageRatios();
|
private UsageRatios userUsageRatios = new UsageRatios();
|
||||||
private WriteLock writeLock;
|
private WriteLock writeLock;
|
||||||
|
private float weight;
|
||||||
|
|
||||||
public User(String name) {
|
public User(String name) {
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
@ -262,6 +266,20 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
public void setResourceUsage(ResourceUsage resourceUsage) {
|
public void setResourceUsage(ResourceUsage resourceUsage) {
|
||||||
this.userResourceUsage = resourceUsage;
|
this.userResourceUsage = resourceUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the weight
|
||||||
|
*/
|
||||||
|
public float getWeight() {
|
||||||
|
return weight;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param weight the weight to set
|
||||||
|
*/
|
||||||
|
public void setWeight(float weight) {
|
||||||
|
this.weight = weight;
|
||||||
|
}
|
||||||
} /* End of User class */
|
} /* End of User class */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -382,6 +400,8 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
// Remove user from active/non-active list as well.
|
// Remove user from active/non-active list as well.
|
||||||
activeUsersSet.remove(userName);
|
activeUsersSet.remove(userName);
|
||||||
nonActiveUsersSet.remove(userName);
|
nonActiveUsersSet.remove(userName);
|
||||||
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
|
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -418,6 +438,8 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
*/
|
*/
|
||||||
private void addUser(String userName, User user) {
|
private void addUser(String userName, User user) {
|
||||||
this.users.put(userName, user);
|
this.users.put(userName, user);
|
||||||
|
user.setWeight(getUserWeightFromQueue(userName));
|
||||||
|
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -434,7 +456,8 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
user.getActiveApplications(), user.getPendingApplications(),
|
user.getActiveApplications(), user.getPendingApplications(),
|
||||||
Resources.clone(user.getConsumedAMResources()),
|
Resources.clone(user.getConsumedAMResources()),
|
||||||
Resources.clone(user.getUserResourceLimit()),
|
Resources.clone(user.getUserResourceLimit()),
|
||||||
user.getResourceUsage()));
|
user.getResourceUsage(), user.getWeight(),
|
||||||
|
activeUsersSet.contains(user.userName)));
|
||||||
}
|
}
|
||||||
return usersToReturn;
|
return usersToReturn;
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -442,6 +465,11 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private float getUserWeightFromQueue(String userName) {
|
||||||
|
Float weight = lQueue.getUserWeights().get(userName);
|
||||||
|
return (weight == null) ? 1.0f : weight.floatValue();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get computed user-limit for all ACTIVE users in this queue. If cached data
|
* Get computed user-limit for all ACTIVE users in this queue. If cached data
|
||||||
* is invalidated due to resource change, this method also enforce to
|
* is invalidated due to resource change, this method also enforce to
|
||||||
|
@ -480,13 +508,24 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
|
||||||
LOG.debug("userLimit is fetched. userLimit = "
|
User user = getUser(userName);
|
||||||
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
|
float weight = (user == null) ? 1.0f : user.getWeight();
|
||||||
+ schedulingMode + ", partition=" + nodePartition);
|
Resource userSpecificUserLimit =
|
||||||
|
Resources.multiplyAndNormalizeDown(resourceCalculator,
|
||||||
|
userLimitResource, weight, lQueue.getMinimumAllocation());
|
||||||
|
|
||||||
|
if (user != null) {
|
||||||
|
user.setUserResourceLimit(userSpecificUserLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
return userLimitPerSchedulingMode.get(schedulingMode);
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
|
||||||
|
+ ", userSpecificUserLimit=" + userSpecificUserLimit
|
||||||
|
+ ", schedulingMode=" + schedulingMode
|
||||||
|
+ ", partition=" + nodePartition);
|
||||||
|
}
|
||||||
|
return userSpecificUserLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -527,13 +566,21 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
|
||||||
|
User user = getUser(userName);
|
||||||
|
float weight = (user == null) ? 1.0f : user.getWeight();
|
||||||
|
Resource userSpecificUserLimit =
|
||||||
|
Resources.multiplyAndNormalizeDown(resourceCalculator,
|
||||||
|
userLimitResource, weight, lQueue.getMinimumAllocation());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("userLimit is fetched. userLimit = "
|
LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
|
||||||
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
|
+ ", userSpecificUserLimit=" + userSpecificUserLimit
|
||||||
+ schedulingMode + ", partition=" + nodePartition);
|
+ ", schedulingMode=" + schedulingMode
|
||||||
|
+ ", partition=" + nodePartition);
|
||||||
}
|
}
|
||||||
|
|
||||||
return userLimitPerSchedulingMode.get(schedulingMode);
|
return userSpecificUserLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -656,16 +703,19 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
queueCapacity, required);
|
queueCapacity, required);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We want to base the userLimit calculation on max(queueCapacity,
|
* We want to base the userLimit calculation on
|
||||||
* usedResources+required). However, we want usedResources to be based on
|
* max(queueCapacity, usedResources+required). However, we want
|
||||||
* the combined ratios of all the users in the queue so we use consumedRatio
|
* usedResources to be based on the combined ratios of all the users in the
|
||||||
* to calculate such. The calculation is dependent on how the
|
* queue so we use consumedRatio to calculate such.
|
||||||
* resourceCalculator calculates the ratio between two Resources. DRF
|
* The calculation is dependent on how the resourceCalculator calculates the
|
||||||
* Example: If usedResources is greater than queueCapacity and users have
|
* ratio between two Resources. DRF Example: If usedResources is greater
|
||||||
* the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
|
* than queueCapacity and users have the following [mem,cpu] usages:
|
||||||
* 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
|
*
|
||||||
* is then 20+30=50%. Yes, this value can be larger than 100% but for the
|
* User1: [10%,20%] - Dominant resource is 20%
|
||||||
* purposes of making sure all users are getting their fair share, it works.
|
* User2: [30%,10%] - Dominant resource is 30%
|
||||||
|
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
|
||||||
|
* larger than 100% but for the purposes of making sure all users are
|
||||||
|
* getting their fair share, it works.
|
||||||
*/
|
*/
|
||||||
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
|
||||||
partitionResource, getUsageRatio(nodePartition),
|
partitionResource, getUsageRatio(nodePartition),
|
||||||
|
@ -680,23 +730,23 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
* capacity * user-limit-factor. Also, the queue's configured capacity
|
* capacity * user-limit-factor. Also, the queue's configured capacity
|
||||||
* should be higher than queue-hard-limit * ulMin
|
* should be higher than queue-hard-limit * ulMin
|
||||||
*/
|
*/
|
||||||
int usersCount = getNumActiveUsers();
|
float usersSummedByWeight = activeUsersTimesWeights;
|
||||||
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
|
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
|
||||||
|
|
||||||
// For non-activeUser calculation, consider all users count.
|
// For non-activeUser calculation, consider all users count.
|
||||||
if (!activeUser) {
|
if (!activeUser) {
|
||||||
resourceUsed = currentCapacity;
|
resourceUsed = currentCapacity;
|
||||||
usersCount = users.size();
|
usersSummedByWeight = allUsersTimesWeights;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* User limit resource is determined by: max{currentCapacity / #activeUsers,
|
* User limit resource is determined by: max(currentCapacity / #activeUsers,
|
||||||
* currentCapacity * user-limit-percentage%)
|
* currentCapacity * user-limit-percentage%)
|
||||||
*/
|
*/
|
||||||
Resource userLimitResource = Resources.max(resourceCalculator,
|
Resource userLimitResource = Resources.max(resourceCalculator,
|
||||||
partitionResource,
|
partitionResource,
|
||||||
Resources.divideAndCeil(resourceCalculator, resourceUsed,
|
Resources.divideAndCeil(resourceCalculator, resourceUsed,
|
||||||
usersCount),
|
usersSummedByWeight),
|
||||||
Resources.divideAndCeil(resourceCalculator,
|
Resources.divideAndCeil(resourceCalculator,
|
||||||
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
|
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
|
||||||
100));
|
100));
|
||||||
|
@ -727,18 +777,26 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
lQueue.getMinimumAllocation());
|
lQueue.getMinimumAllocation());
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("User limit computation for " + userName + " in queue "
|
LOG.debug("User limit computation for " + userName
|
||||||
+ lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
|
+ ", in queue: " + lQueue.getQueueName()
|
||||||
+ " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
|
+ ", userLimitPercent=" + lQueue.getUserLimit()
|
||||||
+ required + " consumed: " + consumed + " user-limit-resource: "
|
+ ", userLimitFactor=" + lQueue.getUserLimitFactor()
|
||||||
+ userLimitResource + " queueCapacity: " + queueCapacity
|
+ ", required=" + required
|
||||||
+ " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
|
+ ", consumed=" + consumed
|
||||||
+ " currentCapacity: " + currentCapacity + " activeUsers: "
|
+ ", user-limit-resource=" + userLimitResource
|
||||||
+ usersCount + " clusterCapacity: " + clusterResource
|
+ ", queueCapacity=" + queueCapacity
|
||||||
+ " resourceByLabel: " + partitionResource + " usageratio: "
|
+ ", qconsumed=" + lQueue.getQueueResourceUsage().getUsed()
|
||||||
+ getUsageRatio(nodePartition) + " Partition: " + nodePartition);
|
+ ", currentCapacity=" + currentCapacity
|
||||||
|
+ ", activeUsers=" + usersSummedByWeight
|
||||||
|
+ ", clusterCapacity=" + clusterResource
|
||||||
|
+ ", resourceByLabel=" + partitionResource
|
||||||
|
+ ", usageratio=" + getUsageRatio(nodePartition)
|
||||||
|
+ ", Partition=" + nodePartition
|
||||||
|
+ ", resourceUsed=" + resourceUsed
|
||||||
|
+ ", maxUserLimit=" + maxUserLimit
|
||||||
|
+ ", userWeight=" + getUser(userName).getWeight()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
getUser(userName).setUserResourceLimit(userLimitResource);
|
|
||||||
return userLimitResource;
|
return userLimitResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -838,6 +896,32 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
return activeUsers.get();
|
return activeUsers.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
float sumActiveUsersTimesWeights() {
|
||||||
|
float count = 0.0f;
|
||||||
|
try {
|
||||||
|
this.readLock.lock();
|
||||||
|
for (String u : activeUsersSet) {
|
||||||
|
count += getUser(u).getWeight();
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
} finally {
|
||||||
|
this.readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
float sumAllUsersTimesWeights() {
|
||||||
|
float count = 0.0f;
|
||||||
|
try {
|
||||||
|
this.readLock.lock();
|
||||||
|
for (String u : users.keySet()) {
|
||||||
|
count += getUser(u).getWeight();
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
} finally {
|
||||||
|
this.readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void updateActiveUsersResourceUsage(String userName) {
|
private void updateActiveUsersResourceUsage(String userName) {
|
||||||
try {
|
try {
|
||||||
this.writeLock.lock();
|
this.writeLock.lock();
|
||||||
|
@ -850,6 +934,7 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
if (nonActiveUsersSet.contains(userName)) {
|
if (nonActiveUsersSet.contains(userName)) {
|
||||||
nonActiveUsersSet.remove(userName);
|
nonActiveUsersSet.remove(userName);
|
||||||
activeUsersSet.add(userName);
|
activeUsersSet.add(userName);
|
||||||
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
|
|
||||||
// Update total resource usage of active and non-active after user
|
// Update total resource usage of active and non-active after user
|
||||||
// is moved from non-active to active.
|
// is moved from non-active to active.
|
||||||
|
@ -890,6 +975,7 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
if (activeUsersSet.contains(userName)) {
|
if (activeUsersSet.contains(userName)) {
|
||||||
activeUsersSet.remove(userName);
|
activeUsersSet.remove(userName);
|
||||||
nonActiveUsersSet.add(userName);
|
nonActiveUsersSet.add(userName);
|
||||||
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
|
|
||||||
// Update total resource usage of active and non-active after user is
|
// Update total resource usage of active and non-active after user is
|
||||||
// moved from active to non-active.
|
// moved from active to non-active.
|
||||||
|
@ -990,4 +1076,18 @@ public class UsersManager implements AbstractUsersManager {
|
||||||
+ totalResUsageForNonActiveUsers.getAllUsed());
|
+ totalResUsageForNonActiveUsers.getAllUsed());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void updateUserWeights() {
|
||||||
|
try {
|
||||||
|
this.writeLock.lock();
|
||||||
|
for (Map.Entry<String, User> ue : users.entrySet()) {
|
||||||
|
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
||||||
|
}
|
||||||
|
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||||
|
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||||
|
userLimitNeedsRecompute();
|
||||||
|
} finally {
|
||||||
|
this.writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -881,8 +881,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
|
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||||
diagnosticMessage.append("; ");
|
diagnosticMessage.append("; ");
|
||||||
diagnosticMessage.append("User AM Resource Limit of the queue = ");
|
diagnosticMessage.append("User AM Resource Limit of the queue = ");
|
||||||
diagnosticMessage.append(
|
diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition(
|
||||||
queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
|
appAMNodePartitionName, getUser()));
|
||||||
diagnosticMessage.append("; ");
|
diagnosticMessage.append("; ");
|
||||||
diagnosticMessage.append("Queue AM Resource Usage = ");
|
diagnosticMessage.append("Queue AM Resource Usage = ");
|
||||||
diagnosticMessage.append(
|
diagnosticMessage.append(
|
||||||
|
|
|
@ -68,6 +68,7 @@ class CapacitySchedulerPage extends RmView {
|
||||||
"left:0%;background:none;border:1px dashed #BFBFBF";
|
"left:0%;background:none;border:1px dashed #BFBFBF";
|
||||||
static final String Q_OVER = "background:#FFA333";
|
static final String Q_OVER = "background:#FFA333";
|
||||||
static final String Q_UNDER = "background:#5BD75B";
|
static final String Q_UNDER = "background:#5BD75B";
|
||||||
|
static final String ACTIVE_USER = "background:#FFFF00"; // Yellow highlight
|
||||||
|
|
||||||
@RequestScoped
|
@RequestScoped
|
||||||
static class CSQInfo {
|
static class CSQInfo {
|
||||||
|
@ -209,6 +210,7 @@ class CapacitySchedulerPage extends RmView {
|
||||||
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
|
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
|
||||||
.$class("ui-state-default")._("User Name")._().th()
|
.$class("ui-state-default")._("User Name")._().th()
|
||||||
.$class("ui-state-default")._("Max Resource")._().th()
|
.$class("ui-state-default")._("Max Resource")._().th()
|
||||||
|
.$class("ui-state-default")._("Weight")._().th()
|
||||||
.$class("ui-state-default")._("Used Resource")._().th()
|
.$class("ui-state-default")._("Used Resource")._().th()
|
||||||
.$class("ui-state-default")._("Max AM Resource")._().th()
|
.$class("ui-state-default")._("Max AM Resource")._().th()
|
||||||
.$class("ui-state-default")._("Used AM Resource")._().th()
|
.$class("ui-state-default")._("Used AM Resource")._().th()
|
||||||
|
@ -229,8 +231,11 @@ class CapacitySchedulerPage extends RmView {
|
||||||
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
|
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
|
||||||
? new ResourceInfo(Resources.none())
|
? new ResourceInfo(Resources.none())
|
||||||
: resourceUsages.getAmUsed();
|
: resourceUsages.getAmUsed();
|
||||||
tbody.tr().td(userInfo.getUsername())
|
String highlightIfAsking =
|
||||||
|
userInfo.getIsActive() ? ACTIVE_USER : null;
|
||||||
|
tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername())
|
||||||
.td(userInfo.getUserResourceLimit().toString())
|
.td(userInfo.getUserResourceLimit().toString())
|
||||||
|
.td(String.valueOf(userInfo.getUserWeight()))
|
||||||
.td(resourcesUsed.toString())
|
.td(resourcesUsed.toString())
|
||||||
.td(resourceUsages.getAMLimit().toString())
|
.td(resourceUsages.getAMLimit().toString())
|
||||||
.td(amUsed.toString())
|
.td(amUsed.toString())
|
||||||
|
@ -399,6 +404,8 @@ class CapacitySchedulerPage extends RmView {
|
||||||
_("Used (over capacity)")._().
|
_("Used (over capacity)")._().
|
||||||
span().$class("qlegend ui-corner-all ui-state-default").
|
span().$class("qlegend ui-corner-all ui-state-default").
|
||||||
_("Max Capacity")._().
|
_("Max Capacity")._().
|
||||||
|
span().$class("qlegend ui-corner-all").$style(ACTIVE_USER).
|
||||||
|
_("Users Requesting Resources")._().
|
||||||
_();
|
_();
|
||||||
|
|
||||||
float used = 0;
|
float used = 0;
|
||||||
|
|
|
@ -955,7 +955,130 @@ public class TestLeafQueue {
|
||||||
// app_0 doesn't have outstanding resources, there's only one active user.
|
// app_0 doesn't have outstanding resources, there's only one active user.
|
||||||
assertEquals("There should only be 1 active user!",
|
assertEquals("There should only be 1 active user!",
|
||||||
1, a.getAbstractUsersManager().getNumActiveUsers());
|
1, a.getAbstractUsersManager().getNumActiveUsers());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUserSpecificUserLimits() throws Exception {
|
||||||
|
// Mock the queue
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
|
// Set minimum-user-limit-percent for queue "a" in the configs.
|
||||||
|
csConf.setUserLimit(a.getQueuePath(), 50);
|
||||||
|
// Set weight for "user_0" to be 1.5 for the a queue in the configs.
|
||||||
|
csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
|
||||||
|
+ ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
|
||||||
|
1.5f);
|
||||||
|
|
||||||
|
when(csContext.getClusterResource())
|
||||||
|
.thenReturn(Resources.createResource(16 * GB, 32));
|
||||||
|
// Verify that configs were updated and parsed correctly.
|
||||||
|
Assert.assertNull(a.getUserWeights().get("user_0"));
|
||||||
|
a.reinitialize(a, csContext.getClusterResource());
|
||||||
|
assertEquals(1.5, a.getUserWeights().get("user_0").floatValue(), 0.0);
|
||||||
|
|
||||||
|
// set maxCapacity
|
||||||
|
a.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
|
// Set minimum user-limit-percent
|
||||||
|
a.setUserLimit(50);
|
||||||
|
a.setUserLimitFactor(2);
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
final String user_1 = "user_1";
|
||||||
|
|
||||||
|
// Set user_0's weight to 1.5 in the a queue's object.
|
||||||
|
a.getUsersManager().getUserAndAddIfAbsent(user_0).setWeight(1.5f);
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
|
a.getAbstractUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
|
||||||
|
a.getAbstractUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app_1, user_1); // different user
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
String host_1 = "127.0.0.2";
|
||||||
|
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
|
||||||
|
final int numNodes = 2;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
// Setup resource-requests
|
||||||
|
// app_0 asks for 3 3-GB containers
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
// app_1 asks for 2 1-GB containers
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
||||||
|
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
||||||
|
app_1);
|
||||||
|
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
||||||
|
node_0, node_1.getNodeID(), node_1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start testing...
|
||||||
|
*/
|
||||||
|
|
||||||
|
// There're two active users
|
||||||
|
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
|
||||||
|
|
||||||
|
// 1 container to user_0. Since queue starts out empty, user limit would
|
||||||
|
// normally be calculated to be the minumum container size (1024GB).
|
||||||
|
// However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB
|
||||||
|
// because 1024 * 1.5 rounded up to container size is 2048GB.
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(4*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
// At this point the queue-wide user limit is 3072GB, but since user_0 has a
|
||||||
|
// weight of 1.5, its user limit is 5120GB. So, even though user_0 already
|
||||||
|
// has 4096GB, it is under its user limit, so it gets another container.
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node_0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(8*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
// Queue-wide user limit at this point is 4069GB and user_0's user limit is
|
||||||
|
// 6144GB. user_0 has 8192GB.
|
||||||
|
// Now that user_0 is above its user limit, the next container should go to user_1
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node_1,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(9*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
assertEquals(4*GB,
|
||||||
|
app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
||||||
|
|
||||||
|
assertEquals(1*GB,
|
||||||
|
app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||||
|
|
|
@ -124,6 +124,7 @@ Configuration
|
||||||
| `yarn.scheduler.capacity.<queue-path>.user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. |
|
| `yarn.scheduler.capacity.<queue-path>.user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. |
|
||||||
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. |
|
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. |
|
||||||
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. |
|
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. |
|
||||||
|
| `yarn.scheduler.capacity.<queue-path>.user-settings.<user-name>.weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. For example, if user A should receive 50% more resources in a queue than users B and C, this property will be set to 1.5 for user A. Users B and C will default to 1.0. |
|
||||||
|
|
||||||
* Running and Pending Application Limits
|
* Running and Pending Application Limits
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue