YARN-5892. Support user-specific minimum user limit percentage in Capacity Scheduler. Contributed by Eric Payne.
This commit is contained in:
parent
9a168ae884
commit
e5ada740ff
|
@ -61,6 +61,12 @@ public class DefaultResourceCalculator extends ResourceCalculator {
|
|||
divideAndCeil(numerator.getMemorySize(), denominator));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource divideAndCeil(Resource numerator, float denominator) {
|
||||
return Resources.createResource(
|
||||
divideAndCeil(numerator.getMemorySize(), denominator));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
|
|
|
@ -149,6 +149,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource divideAndCeil(Resource numerator, float denominator) {
|
||||
return Resources.createResource(
|
||||
divideAndCeil(numerator.getMemorySize(), denominator),
|
||||
divideAndCeil(numerator.getVirtualCores(), denominator)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource normalize(Resource r, Resource minimumResource,
|
||||
Resource maximumResource, Resource stepFactor) {
|
||||
|
|
|
@ -38,6 +38,13 @@ public abstract class ResourceCalculator {
|
|||
return (a + (b - 1)) / b;
|
||||
}
|
||||
|
||||
public static int divideAndCeil(int a, float b) {
|
||||
if (b == 0) {
|
||||
return 0;
|
||||
}
|
||||
return (int) Math.ceil(a / b);
|
||||
}
|
||||
|
||||
public static long divideAndCeil(long a, long b) {
|
||||
if (b == 0) {
|
||||
return 0;
|
||||
|
@ -45,6 +52,13 @@ public abstract class ResourceCalculator {
|
|||
return (a + (b - 1)) / b;
|
||||
}
|
||||
|
||||
public static long divideAndCeil(long a, float b) {
|
||||
if (b == 0) {
|
||||
return 0;
|
||||
}
|
||||
return (long) Math.ceil(a/b);
|
||||
}
|
||||
|
||||
public static int roundUp(int a, int b) {
|
||||
return divideAndCeil(a, b) * b;
|
||||
}
|
||||
|
@ -186,6 +200,15 @@ public abstract class ResourceCalculator {
|
|||
*/
|
||||
public abstract Resource divideAndCeil(Resource numerator, int denominator);
|
||||
|
||||
/**
|
||||
* Divide-and-ceil <code>numerator</code> by <code>denominator</code>.
|
||||
*
|
||||
* @param numerator numerator resource
|
||||
* @param denominator denominator
|
||||
* @return resultant resource
|
||||
*/
|
||||
public abstract Resource divideAndCeil(Resource numerator, float denominator);
|
||||
|
||||
/**
|
||||
* Check if a smaller resource can be contained by bigger resource.
|
||||
*/
|
||||
|
|
|
@ -270,6 +270,11 @@ public class Resources {
|
|||
return resourceCalculator.divideAndCeil(lhs, rhs);
|
||||
}
|
||||
|
||||
public static Resource divideAndCeil(
|
||||
ResourceCalculator resourceCalculator, Resource lhs, float rhs) {
|
||||
return resourceCalculator.divideAndCeil(lhs, rhs);
|
||||
}
|
||||
|
||||
public static boolean equals(Resource lhs, Resource rhs) {
|
||||
return lhs.equals(rhs);
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class ActiveUsersManager {
|
|||
private final QueueMetrics metrics;
|
||||
|
||||
private int activeUsers = 0;
|
||||
private boolean activeUsersChanged = false;
|
||||
private Map<String, Set<ApplicationId>> usersApplications =
|
||||
new HashMap<String, Set<ApplicationId>>();
|
||||
|
||||
|
@ -65,6 +66,7 @@ public class ActiveUsersManager {
|
|||
usersApplications.put(user, userApps);
|
||||
++activeUsers;
|
||||
metrics.incrActiveUsers();
|
||||
activeUsersChanged = true;
|
||||
LOG.debug("User " + user + " added to activeUsers, currently: " +
|
||||
activeUsers);
|
||||
}
|
||||
|
@ -91,6 +93,7 @@ public class ActiveUsersManager {
|
|||
usersApplications.remove(user);
|
||||
--activeUsers;
|
||||
metrics.decrActiveUsers();
|
||||
activeUsersChanged = true;
|
||||
LOG.debug("User " + user + " removed from activeUsers, currently: " +
|
||||
activeUsers);
|
||||
}
|
||||
|
@ -106,4 +109,30 @@ public class ActiveUsersManager {
|
|||
synchronized public int getNumActiveUsers() {
|
||||
return activeUsers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of active users
|
||||
* @return a copy of the list of active users
|
||||
*/
|
||||
@Lock({Queue.class, SchedulerApplicationAttempt.class})
|
||||
synchronized public Set<String> getActiveUsersSet() {
|
||||
return new HashSet<String>(usersApplications.keySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get indicator of whether or not the active users list has changed.
|
||||
* @return active users changed indicator
|
||||
*/
|
||||
@Lock({Queue.class, SchedulerApplicationAttempt.class})
|
||||
synchronized public boolean getActiveUsersChanged() {
|
||||
return activeUsersChanged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear active users changed indicator
|
||||
*/
|
||||
@Lock({Queue.class, SchedulerApplicationAttempt.class})
|
||||
synchronized public void clearActiveUsersChanged() {
|
||||
activeUsersChanged = false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -89,6 +89,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
RecordFactoryProvider.getRecordFactory(null);
|
||||
protected CapacitySchedulerContext csContext;
|
||||
protected YarnAuthorizationProvider authorizer = null;
|
||||
private Map<String, Float> userWeights = new HashMap<String, Float>();
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
|
@ -294,6 +295,22 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
.getReservationContinueLook();
|
||||
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
|
||||
this.userWeights = getUserWeightsFromHierarchy();
|
||||
}
|
||||
|
||||
private Map<String, Float> getUserWeightsFromHierarchy() throws IOException {
|
||||
Map<String, Float> unionInheritedWeights = new HashMap<String, Float>();
|
||||
CSQueue parentQ = getParent();
|
||||
if (parentQ != null) {
|
||||
// Inherit all of parent's user's weights
|
||||
unionInheritedWeights.putAll(parentQ.getUserWeights());
|
||||
}
|
||||
// Insert this queue's user's weights, overriding parent's user's weights if
|
||||
// there is overlap.
|
||||
CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
|
||||
unionInheritedWeights.putAll(
|
||||
csConf.getAllUserWeightsForQueue(getQueuePath()));
|
||||
return unionInheritedWeights;
|
||||
}
|
||||
|
||||
protected QueueInfo getQueueInfo() {
|
||||
|
@ -689,4 +706,9 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
return csContext.getPreemptionManager().getKillableContainers(queueName,
|
||||
partition);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Float> getUserWeights() {
|
||||
return userWeights;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -338,4 +339,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
* @return valid node labels
|
||||
*/
|
||||
public Set<String> getNodeLabelsForQueue();
|
||||
|
||||
/**
|
||||
* Get a map of usernames and weights
|
||||
* @return map of usernames and corresponding weight
|
||||
*/
|
||||
Map<String, Float> getUserWeights();
|
||||
}
|
||||
|
|
|
@ -103,6 +103,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
@Private
|
||||
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
|
||||
|
||||
@Private
|
||||
public static final String USER_WEIGHT = "weight";
|
||||
|
||||
@Private
|
||||
public static final String USER_SETTINGS = "user-settings";
|
||||
|
||||
@Private
|
||||
public static final float DEFAULT_USER_WEIGHT = 1.0f;
|
||||
|
||||
@Private
|
||||
public static final String STATE = "state";
|
||||
|
||||
|
@ -1137,4 +1146,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
|
||||
+ INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
|
||||
public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
|
||||
|
||||
/**
|
||||
* Get the weights of all users at this queue level from the configuration.
|
||||
* Used in computing user-specific user limit, relative to other users.
|
||||
* @param queuePath full queue path
|
||||
* @return map of user weights, if they exists. Otherwise, return empty map.
|
||||
*/
|
||||
public Map<String, Float> getAllUserWeightsForQueue(String queuePath) {
|
||||
Map <String, Float> userWeights = new HashMap <String, Float>();
|
||||
String qPathPlusPrefix =
|
||||
getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
|
||||
+ USER_SETTINGS + "\\.";
|
||||
String weightKeyRegex =
|
||||
qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
|
||||
Map<String, String> props = getValByRegex(weightKeyRegex);
|
||||
for (Entry<String, String> e : props.entrySet()) {
|
||||
String userName =
|
||||
e.getKey().replaceFirst(qPathPlusPrefix, "")
|
||||
.replaceFirst("\\." + USER_WEIGHT, "");
|
||||
if (userName != null && !userName.isEmpty()) {
|
||||
userWeights.put(userName, new Float(e.getValue()));
|
||||
}
|
||||
}
|
||||
return userWeights;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,8 +77,10 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
@ -137,6 +139,11 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
|
||||
new HashMap<>();
|
||||
|
||||
private Set<String> activeUsersSet =
|
||||
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
|
||||
private float activeUsersTimesWeights = 0.0f;
|
||||
private float allUsersTimesWeights = 0.0f;
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
|
@ -231,6 +238,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
defaultAppPriorityPerQueue = Priority.newInstance(conf
|
||||
.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
|
||||
|
||||
// Validate leaf queue's user's weights.
|
||||
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
|
||||
for (Entry<String, Float> e : getUserWeights().entrySet()) {
|
||||
float val = e.getValue().floatValue();
|
||||
if (val < 0.0f || val > (100.0f / queueUL)) {
|
||||
throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
|
||||
+ "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
|
||||
100.0f/queueUL + ", the number of concurrent active users in "
|
||||
+ getQueuePath() + ")");
|
||||
}
|
||||
}
|
||||
|
||||
updateUserWeights();
|
||||
|
||||
LOG.info("Initializing " + queueName + "\n" +
|
||||
"capacity = " + queueCapacities.getCapacity() +
|
||||
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
||||
|
@ -279,6 +300,16 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
"defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue);
|
||||
}
|
||||
|
||||
// This must be called from a synchronized method.
|
||||
private void updateUserWeights() {
|
||||
activeUsersSet = activeUsersManager.getActiveUsersSet();
|
||||
for (Map.Entry<String, User> ue : users.entrySet()) {
|
||||
ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
|
||||
}
|
||||
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueuePath() {
|
||||
return getParent().getQueuePath() + "." + getQueueName();
|
||||
|
@ -423,10 +454,17 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
if (user == null) {
|
||||
user = new User(userName);
|
||||
users.put(userName, user);
|
||||
user.setWeight(getUserWeightFromQueue(userName));
|
||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||
}
|
||||
return user;
|
||||
}
|
||||
|
||||
private float getUserWeightFromQueue(String userName) {
|
||||
Float weight = getUserWeights().get(userName);
|
||||
return (weight == null) ? 1.0f : weight.floatValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return an ArrayList of UserInfo objects who are active in this queue
|
||||
*/
|
||||
|
@ -438,7 +476,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
.getAllUsed()), user.getActiveApplications(), user
|
||||
.getPendingApplications(), Resources.clone(user
|
||||
.getConsumedAMResources()), Resources.clone(user
|
||||
.getUserResourceLimit()), user.getResourceUsage()));
|
||||
.getUserResourceLimit()), user.getResourceUsage(),
|
||||
user.getWeight(), activeUsersSet.contains(user.userName)));
|
||||
}
|
||||
return usersToReturn;
|
||||
}
|
||||
|
@ -565,19 +604,36 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
@VisibleForTesting
|
||||
public synchronized Resource getUserAMResourceLimit() {
|
||||
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
|
||||
return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
|
||||
null);
|
||||
}
|
||||
|
||||
public synchronized Resource getUserAMResourceLimitPerPartition(
|
||||
String nodePartition) {
|
||||
String nodePartition, String userName) {
|
||||
float userWeight = 1.0f;
|
||||
if (userName != null && getUser(userName) != null) {
|
||||
userWeight = getUser(userName).getWeight();
|
||||
}
|
||||
if (activeUsersManager.getActiveUsersChanged()) {
|
||||
activeUsersSet = activeUsersManager.getActiveUsersSet();
|
||||
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||
activeUsersManager.clearActiveUsersChanged();
|
||||
}
|
||||
/*
|
||||
* The user am resource limit is based on the same approach as the user
|
||||
* limit (as it should represent a subset of that). This means that it uses
|
||||
* the absolute queue capacity (per partition) instead of the max and is
|
||||
* modified by the userlimit and the userlimit factor as is the userlimit
|
||||
*/
|
||||
float effectiveUserLimit = Math.max(userLimit / 100.0f,
|
||||
float effectiveUserLimit;
|
||||
if (activeUsersTimesWeights > 0.0f) {
|
||||
effectiveUserLimit = Math.max(userLimit / 100.0f,
|
||||
1.0f / activeUsersTimesWeights);
|
||||
} else {
|
||||
effectiveUserLimit = Math.max(userLimit / 100.0f,
|
||||
1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
|
||||
}
|
||||
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
|
||||
|
||||
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
|
||||
resourceCalculator,
|
||||
|
@ -704,7 +760,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
|
||||
// Verify whether we already calculated user-am-limit for this label.
|
||||
if (userAMLimit == null) {
|
||||
userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
|
||||
userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
|
||||
application.getUser());
|
||||
userAmPartitionLimit.put(partitionName, userAMLimit);
|
||||
}
|
||||
|
||||
|
@ -819,6 +876,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
user.finishApplication(wasActive);
|
||||
if (user.getTotalApplications() == 0) {
|
||||
users.remove(application.getUser());
|
||||
allUsersTimesWeights = sumAllUsersTimesWeights();
|
||||
}
|
||||
|
||||
// Check if we can activate more applications
|
||||
|
@ -1245,20 +1303,25 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Also, the queue's configured capacity should be higher than
|
||||
// queue-hard-limit * ulMin
|
||||
|
||||
final int usersCount;
|
||||
float usersSummedByWeight;
|
||||
if (forActive) {
|
||||
usersCount = activeUsersManager.getNumActiveUsers();
|
||||
if (activeUsersManager.getActiveUsersChanged()) {
|
||||
activeUsersSet = activeUsersManager.getActiveUsersSet();
|
||||
activeUsersTimesWeights = sumActiveUsersTimesWeights();
|
||||
activeUsersManager.clearActiveUsersChanged();
|
||||
}
|
||||
usersSummedByWeight = activeUsersTimesWeights;
|
||||
} else {
|
||||
usersCount = users.size();
|
||||
usersSummedByWeight = allUsersTimesWeights;
|
||||
}
|
||||
|
||||
// User limit resource is determined by:
|
||||
// max{currentCapacity / #activeUsers, currentCapacity *
|
||||
// max(currentCapacity / #activeUsers, currentCapacity *
|
||||
// user-limit-percentage%)
|
||||
Resource userLimitResource = Resources.max(
|
||||
resourceCalculator, partitionResource,
|
||||
Resources.divideAndCeil(
|
||||
resourceCalculator, currentCapacity, usersCount),
|
||||
resourceCalculator, currentCapacity, usersSummedByWeight),
|
||||
Resources.divideAndCeil(
|
||||
resourceCalculator,
|
||||
Resources.multiplyAndRoundDown(
|
||||
|
@ -1306,19 +1369,46 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
" qconsumed: " + queueUsage.getUsed() +
|
||||
" consumedRatio: " + totalUserConsumedRatio +
|
||||
" currentCapacity: " + currentCapacity +
|
||||
" activeUsers: " + usersCount +
|
||||
" activeUsers: " + usersSummedByWeight +
|
||||
" clusterCapacity: " + clusterResource +
|
||||
" resourceByLabel: " + partitionResource +
|
||||
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
|
||||
" Partition: " + nodePartition
|
||||
" Partition: " + nodePartition +
|
||||
" maxUserLimit=" + maxUserLimit +
|
||||
" userWeight=" + ((user != null) ? user.getWeight() : 1.0f)
|
||||
);
|
||||
}
|
||||
// Apply user's weight.
|
||||
float weight = (user == null) ? 1.0f : user.getWeight();
|
||||
userLimitResource =
|
||||
Resources.multiplyAndNormalizeDown(resourceCalculator,
|
||||
userLimitResource, weight, minimumAllocation);
|
||||
|
||||
if (forActive) {
|
||||
user.setUserResourceLimit(userLimitResource);
|
||||
}
|
||||
return userLimitResource;
|
||||
}
|
||||
|
||||
float sumActiveUsersTimesWeights() {
|
||||
float count = 0.0f;
|
||||
for (String userName : activeUsersSet) {
|
||||
// Do the following instead of calling getUser to avoid synchronization.
|
||||
User user = users.get(userName);
|
||||
count += (user != null) ? user.getWeight() : 0.0f;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
synchronized float sumAllUsersTimesWeights() {
|
||||
float count = 0.0f;
|
||||
for (String userName : users.keySet()) {
|
||||
User user = getUser(userName);
|
||||
count += user.getWeight();
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
@Private
|
||||
protected synchronized boolean canAssignToUser(Resource clusterResource,
|
||||
String userName, Resource limit, Resource rsrv,
|
||||
|
@ -1758,6 +1848,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
int activeApplications = 0;
|
||||
private UsageRatios userUsageRatios = new UsageRatios();
|
||||
String userName;
|
||||
float weight = 1.0f;
|
||||
|
||||
public User(String name) {
|
||||
this.userName = name;
|
||||
|
@ -1859,6 +1950,20 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
public void setResourceUsage(ResourceUsage resourceUsage) {
|
||||
this.userResourceUsage = resourceUsage;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the weight
|
||||
*/
|
||||
public float getWeight() {
|
||||
return weight;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param weight the weight to set
|
||||
*/
|
||||
public void setWeight(float weight) {
|
||||
this.weight = weight;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,11 +37,14 @@ public class UserInfo {
|
|||
protected ResourceInfo AMResourceUsed;
|
||||
protected ResourceInfo userResourceLimit;
|
||||
protected ResourcesInfo resources;
|
||||
private float userWeight;
|
||||
private boolean isActive;
|
||||
|
||||
UserInfo() {}
|
||||
|
||||
UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
|
||||
Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) {
|
||||
Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage,
|
||||
float weight, boolean isActive) {
|
||||
this.username = username;
|
||||
this.resourcesUsed = new ResourceInfo(resUsed);
|
||||
this.numActiveApplications = activeApps;
|
||||
|
@ -49,6 +52,8 @@ public class UserInfo {
|
|||
this.AMResourceUsed = new ResourceInfo(amResUsed);
|
||||
this.userResourceLimit = new ResourceInfo(resourceLimit);
|
||||
this.resources = new ResourcesInfo(resourceUsage);
|
||||
this.userWeight = weight;
|
||||
this.isActive = isActive;
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
|
@ -78,4 +83,12 @@ public class UserInfo {
|
|||
public ResourcesInfo getResourceUsageInfo() {
|
||||
return resources;
|
||||
}
|
||||
|
||||
public float getUserWeight() {
|
||||
return userWeight;
|
||||
}
|
||||
|
||||
public boolean getIsActive() {
|
||||
return isActive;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -559,8 +559,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
.append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("User AM Resource Limit of the queue = ");
|
||||
diagnosticMessage.append(
|
||||
queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
|
||||
diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition(
|
||||
appAMNodePartitionName, getUser()));
|
||||
diagnosticMessage.append("; ");
|
||||
diagnosticMessage.append("Queue AM Resource Usage = ");
|
||||
diagnosticMessage.append(
|
||||
|
|
|
@ -67,6 +67,7 @@ class CapacitySchedulerPage extends RmView {
|
|||
"left:0%;background:none;border:1px dashed #BFBFBF";
|
||||
static final String Q_OVER = "background:#FFA333";
|
||||
static final String Q_UNDER = "background:#5BD75B";
|
||||
static final String ACTIVE_USER = "background:#FFFF00"; // Yellow highlight
|
||||
|
||||
@RequestScoped
|
||||
static class CSQInfo {
|
||||
|
@ -208,6 +209,7 @@ class CapacitySchedulerPage extends RmView {
|
|||
html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
|
||||
.$class("ui-state-default")._("User Name")._().th()
|
||||
.$class("ui-state-default")._("Max Resource")._().th()
|
||||
.$class("ui-state-default")._("Weight")._().th()
|
||||
.$class("ui-state-default")._("Used Resource")._().th()
|
||||
.$class("ui-state-default")._("Max AM Resource")._().th()
|
||||
.$class("ui-state-default")._("Used AM Resource")._().th()
|
||||
|
@ -228,8 +230,11 @@ class CapacitySchedulerPage extends RmView {
|
|||
ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
|
||||
? new ResourceInfo(Resources.none())
|
||||
: resourceUsages.getAmUsed();
|
||||
tbody.tr().td(userInfo.getUsername())
|
||||
String highlightIfAsking =
|
||||
userInfo.getIsActive() ? ACTIVE_USER : null;
|
||||
tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername())
|
||||
.td(userInfo.getUserResourceLimit().toString())
|
||||
.td(String.valueOf(userInfo.getUserWeight()))
|
||||
.td(resourcesUsed.toString())
|
||||
.td(resourceUsages.getAMLimit().toString())
|
||||
.td(amUsed.toString())
|
||||
|
@ -397,6 +402,8 @@ class CapacitySchedulerPage extends RmView {
|
|||
_("Used (over capacity)")._().
|
||||
span().$class("qlegend ui-corner-all ui-state-default").
|
||||
_("Max Capacity")._().
|
||||
span().$class("qlegend ui-corner-all").$style(ACTIVE_USER).
|
||||
_("Users Requesting Resources")._().
|
||||
_();
|
||||
|
||||
float used = 0;
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||
|
@ -3380,4 +3382,125 @@ public class TestLeafQueue {
|
|||
cs.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserSpecificUserLimits() throws Exception {
|
||||
// Mock the queue
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
// Set minimum-user-limit-percent for queue "a" in the configs.
|
||||
csConf.setUserLimit(a.getQueuePath(), 50);
|
||||
// Set weight for "user_0" to be 1.5 for the a queue in the configs.
|
||||
csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
|
||||
+ ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
|
||||
1.5f);
|
||||
|
||||
when(csContext.getClusterResource())
|
||||
.thenReturn(Resources.createResource(16 * GB, 32));
|
||||
// Verify that configs were updated and parsed correctly.
|
||||
Assert.assertNull(a.getUserWeights().get("user_0"));
|
||||
a.reinitialize(a, csContext.getClusterResource());
|
||||
assertEquals(1.5, a.getUserWeights().get("user_0").floatValue(), 0.0);
|
||||
|
||||
// set maxCapacity
|
||||
a.setMaxCapacity(1.0f);
|
||||
|
||||
// Set minimum user-limit-percent
|
||||
a.setUserLimit(50);
|
||||
a.setUserLimitFactor(2);
|
||||
|
||||
// Users
|
||||
final String user_0 = "user_0";
|
||||
final String user_1 = "user_1";
|
||||
|
||||
// Set user_0's weight to 1.5 in the a queue's object.
|
||||
a.getUser(user_0).setWeight(1.5f);
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId_0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 =
|
||||
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
a.getActiveUsersManager(), spyRMContext);
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
||||
final ApplicationAttemptId appAttemptId_1 =
|
||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 =
|
||||
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
|
||||
a.getActiveUsersManager(), spyRMContext);
|
||||
a.submitApplicationAttempt(app_1, user_1); // different user
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
|
||||
|
||||
final int numNodes = 2;
|
||||
Resource clusterResource =
|
||||
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
// Setup resource-requests
|
||||
// app_0 asks for 3 3-GB containers
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
// app_1 asks for 2 1-GB containers
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
||||
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
|
||||
app_1);
|
||||
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
|
||||
node_0, node_1.getNodeID(), node_1);
|
||||
|
||||
/**
|
||||
* Start testing...
|
||||
*/
|
||||
|
||||
// There're two active users
|
||||
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
|
||||
|
||||
// 1 container to user_0. Since queue starts out empty, user limit would
|
||||
// normally be calculated to be the minumum container size (1024GB).
|
||||
// However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB
|
||||
// because 1024 * 1.5 rounded up to container size is 2048GB.
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource),
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
assertEquals(4*GB, a.getUsedResources().getMemorySize());
|
||||
assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||
|
||||
// At this point the queue-wide user limit is 3072GB, but since user_0 has a
|
||||
// weight of 1.5, its user limit is 5120GB. So, even though user_0 already
|
||||
// has 4096GB, it is under its user limit, so it gets another container.
|
||||
a.assignContainers(clusterResource, node_0,
|
||||
new ResourceLimits(clusterResource),
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
assertEquals(8*GB, a.getUsedResources().getMemorySize());
|
||||
assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||
|
||||
// Queue-wide user limit at this point is 4069GB and user_0's user limit is
|
||||
// 6144GB. user_0 has 8192GB.
|
||||
// Now that user_0 is above its user limit, the next container should go to user_1
|
||||
a.assignContainers(clusterResource, node_1,
|
||||
new ResourceLimits(clusterResource),
|
||||
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
||||
assertEquals(9*GB, a.getUsedResources().getMemorySize());
|
||||
assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
|
||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
|
||||
|
||||
assertEquals(4*GB,
|
||||
app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
||||
|
||||
assertEquals(1*GB,
|
||||
app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ Configuration
|
|||
| `yarn.scheduler.capacity.<queue-path>.user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle th cluster is. Value is specified as a float. |
|
||||
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. |
|
||||
| `yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. |
|
||||
| `yarn.scheduler.capacity.<queue-path>.user-settings.<user-name>.weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. For example, if user A should receive 50% more resources in a queue than users B and C, this property will be set to 1.5 for user A. Users B and C will default to 1.0. |
|
||||
|
||||
* Running and Pending Application Limits
|
||||
|
||||
|
|
Loading…
Reference in New Issue