diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index fef5912b92a..cc2c1aefd31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -61,6 +61,12 @@ public class DefaultResourceCalculator extends ResourceCalculator { divideAndCeil(numerator.getMemorySize(), denominator)); } + @Override + public Resource divideAndCeil(Resource numerator, float denominator) { + return Resources.createResource( + divideAndCeil(numerator.getMemorySize(), denominator)); + } + @Override public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 1472f0ea040..a31e5459fa5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -149,6 +149,14 @@ public class DominantResourceCalculator extends ResourceCalculator { ); } + @Override + public Resource divideAndCeil(Resource numerator, float denominator) { + return Resources.createResource( + divideAndCeil(numerator.getMemorySize(), denominator), + divideAndCeil(numerator.getVirtualCores(), denominator) + ); + } + @Override public Resource normalize(Resource r, Resource minimumResource, Resource maximumResource, Resource stepFactor) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index e8db845c86a..cf2d7adc1f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -37,6 +37,13 @@ public abstract class ResourceCalculator { } return (a + (b - 1)) / b; } + + public static int divideAndCeil(int a, float b) { + if (b == 0) { + return 0; + } + return (int) Math.ceil(a / b); + } public static long divideAndCeil(long a, long b) { if (b == 0) { @@ -45,6 +52,13 @@ public abstract class ResourceCalculator { return (a + (b - 1)) / b; } + public static long divideAndCeil(long a, float b) { + if (b == 0) { + return 0; + } + return (long) Math.ceil(a/b); + } + public static int roundUp(int a, int b) { return divideAndCeil(a, b) * b; } @@ -185,6 +199,15 @@ public abstract class ResourceCalculator { * @return resultant resource */ public abstract Resource divideAndCeil(Resource numerator, int denominator); + + /** + * Divide-and-ceil numerator by denominator. + * + * @param numerator numerator resource + * @param denominator denominator + * @return resultant resource + */ + public abstract Resource divideAndCeil(Resource numerator, float denominator); /** * Check if a smaller resource can be contained by bigger resource. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 4f7a1d69eed..814dcff06cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -269,6 +269,11 @@ public class Resources { ResourceCalculator resourceCalculator, Resource lhs, int rhs) { return resourceCalculator.divideAndCeil(lhs, rhs); } + + public static Resource divideAndCeil( + ResourceCalculator resourceCalculator, Resource lhs, float rhs) { + return resourceCalculator.divideAndCeil(lhs, rhs); + } public static boolean equals(Resource lhs, Resource rhs) { return lhs.equals(rhs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e68583857..4172c4d4671 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -43,6 +43,7 @@ public class ActiveUsersManager { private final QueueMetrics metrics; private int activeUsers = 0; + private boolean activeUsersChanged = false; private Map> usersApplications = new HashMap>(); @@ -65,6 +66,7 @@ public class ActiveUsersManager { usersApplications.put(user, userApps); ++activeUsers; metrics.incrActiveUsers(); + activeUsersChanged = true; LOG.debug("User " + user + " added to activeUsers, currently: " + activeUsers); } @@ -91,6 +93,7 @@ public class ActiveUsersManager { usersApplications.remove(user); --activeUsers; metrics.decrActiveUsers(); + activeUsersChanged = true; LOG.debug("User " + user + " removed from activeUsers, currently: " + activeUsers); } @@ -106,4 +109,30 @@ public class ActiveUsersManager { synchronized public int getNumActiveUsers() { return activeUsers; } + + /** + * Get list of active users + * @return a copy of the list of active users + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public Set getActiveUsersSet() { + return new HashSet(usersApplications.keySet()); + } + + /** + * Get indicator of whether or not the active users list has changed. + * @return active users changed indicator + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public boolean getActiveUsersChanged() { + return activeUsersChanged; + } + + /** + * Clear active users changed indicator + */ + @Lock({Queue.class, SchedulerApplicationAttempt.class}) + synchronized public void clearActiveUsersChanged() { + activeUsersChanged = false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c22a1219ee..0ca5b582649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -89,6 +89,7 @@ public abstract class AbstractCSQueue implements CSQueue { RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; + private Map userWeights = new HashMap(); public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -294,6 +295,22 @@ public abstract class AbstractCSQueue implements CSQueue { .getReservationContinueLook(); this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); + this.userWeights = getUserWeightsFromHierarchy(); + } + + private Map getUserWeightsFromHierarchy() throws IOException { + Map unionInheritedWeights = new HashMap(); + CSQueue parentQ = getParent(); + if (parentQ != null) { + // Inherit all of parent's user's weights + unionInheritedWeights.putAll(parentQ.getUserWeights()); + } + // Insert this queue's user's weights, overriding parent's user's weights if + // there is overlap. + CapacitySchedulerConfiguration csConf = csContext.getConfiguration(); + unionInheritedWeights.putAll( + csConf.getAllUserWeightsForQueue(getQueuePath())); + return unionInheritedWeights; } protected QueueInfo getQueueInfo() { @@ -689,4 +706,9 @@ public abstract class AbstractCSQueue implements CSQueue { return csContext.getPreemptionManager().getKillableContainers(queueName, partition); } + + @Override + public Map getUserWeights() { + return userWeights; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 26ad131d67b..2c003aa45e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -338,4 +339,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @return valid node labels */ public Set getNodeLabelsForQueue(); + + /** + * Get a map of usernames and weights + * @return map of usernames and corresponding weight + */ + Map getUserWeights(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index f2f9001fd25..38b538c1942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -103,6 +103,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String USER_LIMIT_FACTOR = "user-limit-factor"; + @Private + public static final String USER_WEIGHT = "weight"; + + @Private + public static final String USER_SETTINGS = "user-settings"; + + @Private + public static final float DEFAULT_USER_WEIGHT = 1.0f; + @Private public static final String STATE = "state"; @@ -1137,4 +1146,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy"; public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first"; + + /** + * Get the weights of all users at this queue level from the configuration. + * Used in computing user-specific user limit, relative to other users. + * @param queuePath full queue path + * @return map of user weights, if they exists. Otherwise, return empty map. + */ + public Map getAllUserWeightsForQueue(String queuePath) { + Map userWeights = new HashMap (); + String qPathPlusPrefix = + getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.") + + USER_SETTINGS + "\\."; + String weightKeyRegex = + qPathPlusPrefix + "\\w+\\." + USER_WEIGHT; + Map props = getValByRegex(weightKeyRegex); + for (Entry e : props.entrySet()) { + String userName = + e.getKey().replaceFirst(qPathPlusPrefix, "") + .replaceFirst("\\." + USER_WEIGHT, ""); + if (userName != null && !userName.isEmpty()) { + userWeights.put(userName, new Float(e.getValue())); + } + } + return userWeights; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index bd3bdff1439..497e0ecef4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -77,8 +77,10 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -137,6 +139,11 @@ public class LeafQueue extends AbstractCSQueue { private Map> ignorePartitionExclusivityRMContainers = new HashMap<>(); + private Set activeUsersSet = + Collections.newSetFromMap(new ConcurrentHashMap()); + private float activeUsersTimesWeights = 0.0f; + private float allUsersTimesWeights = 0.0f; + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -231,6 +238,20 @@ public class LeafQueue extends AbstractCSQueue { defaultAppPriorityPerQueue = Priority.newInstance(conf .getDefaultApplicationPriorityConfPerQueue(getQueuePath())); + // Validate leaf queue's user's weights. + int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); + for (Entry e : getUserWeights().entrySet()) { + float val = e.getValue().floatValue(); + if (val < 0.0f || val > (100.0f / queueUL)) { + throw new IOException("Weight (" + val + ") for user \"" + e.getKey() + + "\" must be between 0 and" + " 100 / " + queueUL + " (= " + + 100.0f/queueUL + ", the number of concurrent active users in " + + getQueuePath() + ")"); + } + } + + updateUserWeights(); + LOG.info("Initializing " + queueName + "\n" + "capacity = " + queueCapacities.getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n" + @@ -279,6 +300,16 @@ public class LeafQueue extends AbstractCSQueue { "defaultAppPriorityPerQueue = " + defaultAppPriorityPerQueue); } + // This must be called from a synchronized method. + private void updateUserWeights() { + activeUsersSet = activeUsersManager.getActiveUsersSet(); + for (Map.Entry ue : users.entrySet()) { + ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); + } + activeUsersTimesWeights = sumActiveUsersTimesWeights(); + allUsersTimesWeights = sumAllUsersTimesWeights(); + } + @Override public String getQueuePath() { return getParent().getQueuePath() + "." + getQueueName(); @@ -423,10 +454,17 @@ public class LeafQueue extends AbstractCSQueue { if (user == null) { user = new User(userName); users.put(userName, user); + user.setWeight(getUserWeightFromQueue(userName)); + allUsersTimesWeights = sumAllUsersTimesWeights(); } return user; } + private float getUserWeightFromQueue(String userName) { + Float weight = getUserWeights().get(userName); + return (weight == null) ? 1.0f : weight.floatValue(); + } + /** * @return an ArrayList of UserInfo objects who are active in this queue */ @@ -438,7 +476,8 @@ public class LeafQueue extends AbstractCSQueue { .getAllUsed()), user.getActiveApplications(), user .getPendingApplications(), Resources.clone(user .getConsumedAMResources()), Resources.clone(user - .getUserResourceLimit()), user.getResourceUsage())); + .getUserResourceLimit()), user.getResourceUsage(), + user.getWeight(), activeUsersSet.contains(user.userName))); } return usersToReturn; } @@ -565,19 +604,36 @@ public class LeafQueue extends AbstractCSQueue { @VisibleForTesting public synchronized Resource getUserAMResourceLimit() { - return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL); + return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL, + null); } public synchronized Resource getUserAMResourceLimitPerPartition( - String nodePartition) { + String nodePartition, String userName) { + float userWeight = 1.0f; + if (userName != null && getUser(userName) != null) { + userWeight = getUser(userName).getWeight(); + } + if (activeUsersManager.getActiveUsersChanged()) { + activeUsersSet = activeUsersManager.getActiveUsersSet(); + activeUsersTimesWeights = sumActiveUsersTimesWeights(); + activeUsersManager.clearActiveUsersChanged(); + } /* * The user am resource limit is based on the same approach as the user * limit (as it should represent a subset of that). This means that it uses * the absolute queue capacity (per partition) instead of the max and is * modified by the userlimit and the userlimit factor as is the userlimit */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, - 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + float effectiveUserLimit; + if (activeUsersTimesWeights > 0.0f) { + effectiveUserLimit = Math.max(userLimit / 100.0f, + 1.0f / activeUsersTimesWeights); + } else { + effectiveUserLimit = Math.max(userLimit / 100.0f, + 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + } + effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f); Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( resourceCalculator, @@ -704,7 +760,8 @@ public class LeafQueue extends AbstractCSQueue { // Verify whether we already calculated user-am-limit for this label. if (userAMLimit == null) { - userAMLimit = getUserAMResourceLimitPerPartition(partitionName); + userAMLimit = getUserAMResourceLimitPerPartition(partitionName, + application.getUser()); userAmPartitionLimit.put(partitionName, userAMLimit); } @@ -819,6 +876,7 @@ public class LeafQueue extends AbstractCSQueue { user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { users.remove(application.getUser()); + allUsersTimesWeights = sumAllUsersTimesWeights(); } // Check if we can activate more applications @@ -1245,20 +1303,25 @@ public class LeafQueue extends AbstractCSQueue { // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - final int usersCount; + float usersSummedByWeight; if (forActive) { - usersCount = activeUsersManager.getNumActiveUsers(); + if (activeUsersManager.getActiveUsersChanged()) { + activeUsersSet = activeUsersManager.getActiveUsersSet(); + activeUsersTimesWeights = sumActiveUsersTimesWeights(); + activeUsersManager.clearActiveUsersChanged(); + } + usersSummedByWeight = activeUsersTimesWeights; } else { - usersCount = users.size(); + usersSummedByWeight = allUsersTimesWeights; } // User limit resource is determined by: - // max{currentCapacity / #activeUsers, currentCapacity * + // max(currentCapacity / #activeUsers, currentCapacity * // user-limit-percentage%) Resource userLimitResource = Resources.max( resourceCalculator, partitionResource, Resources.divideAndCeil( - resourceCalculator, currentCapacity, usersCount), + resourceCalculator, currentCapacity, usersSummedByWeight), Resources.divideAndCeil( resourceCalculator, Resources.multiplyAndRoundDown( @@ -1306,18 +1369,45 @@ public class LeafQueue extends AbstractCSQueue { " qconsumed: " + queueUsage.getUsed() + " consumedRatio: " + totalUserConsumedRatio + " currentCapacity: " + currentCapacity + - " activeUsers: " + usersCount + + " activeUsers: " + usersSummedByWeight + " clusterCapacity: " + clusterResource + " resourceByLabel: " + partitionResource + " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + - " Partition: " + nodePartition + " Partition: " + nodePartition + + " maxUserLimit=" + maxUserLimit + + " userWeight=" + ((user != null) ? user.getWeight() : 1.0f) ); } + // Apply user's weight. + float weight = (user == null) ? 1.0f : user.getWeight(); + userLimitResource = + Resources.multiplyAndNormalizeDown(resourceCalculator, + userLimitResource, weight, minimumAllocation); + if (forActive) { user.setUserResourceLimit(userLimitResource); } return userLimitResource; } + + float sumActiveUsersTimesWeights() { + float count = 0.0f; + for (String userName : activeUsersSet) { + // Do the following instead of calling getUser to avoid synchronization. + User user = users.get(userName); + count += (user != null) ? user.getWeight() : 0.0f; + } + return count; + } + + synchronized float sumAllUsersTimesWeights() { + float count = 0.0f; + for (String userName : users.keySet()) { + User user = getUser(userName); + count += user.getWeight(); + } + return count; + } @Private protected synchronized boolean canAssignToUser(Resource clusterResource, @@ -1758,6 +1848,7 @@ public class LeafQueue extends AbstractCSQueue { int activeApplications = 0; private UsageRatios userUsageRatios = new UsageRatios(); String userName; + float weight = 1.0f; public User(String name) { this.userName = name; @@ -1859,6 +1950,20 @@ public class LeafQueue extends AbstractCSQueue { public void setResourceUsage(ResourceUsage resourceUsage) { this.userResourceUsage = resourceUsage; } + + /** + * @return the weight + */ + public float getWeight() { + return weight; + } + + /** + * @param weight the weight to set + */ + public void setWeight(float weight) { + this.weight = weight; + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java index ff9d304792f..a1a8ecf71b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java @@ -37,11 +37,14 @@ public class UserInfo { protected ResourceInfo AMResourceUsed; protected ResourceInfo userResourceLimit; protected ResourcesInfo resources; + private float userWeight; + private boolean isActive; UserInfo() {} UserInfo(String username, Resource resUsed, int activeApps, int pendingApps, - Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) { + Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage, + float weight, boolean isActive) { this.username = username; this.resourcesUsed = new ResourceInfo(resUsed); this.numActiveApplications = activeApps; @@ -49,6 +52,8 @@ public class UserInfo { this.AMResourceUsed = new ResourceInfo(amResUsed); this.userResourceLimit = new ResourceInfo(resourceLimit); this.resources = new ResourcesInfo(resourceUsage); + this.userWeight = weight; + this.isActive = isActive; } public String getUsername() { @@ -78,4 +83,12 @@ public class UserInfo { public ResourcesInfo getResourceUsageInfo() { return resources; } + + public float getUserWeight() { + return userWeight; + } + + public boolean getIsActive() { + return isActive; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index de997422130..782ad38e814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -559,8 +559,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { .append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName)); diagnosticMessage.append("; "); diagnosticMessage.append("User AM Resource Limit of the queue = "); - diagnosticMessage.append( - queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName)); + diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition( + appAMNodePartitionName, getUser())); diagnosticMessage.append("; "); diagnosticMessage.append("Queue AM Resource Usage = "); diagnosticMessage.append( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index 5abc2503d14..ffcd3dff6af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -67,6 +67,7 @@ class CapacitySchedulerPage extends RmView { "left:0%;background:none;border:1px dashed #BFBFBF"; static final String Q_OVER = "background:#FFA333"; static final String Q_UNDER = "background:#5BD75B"; + static final String ACTIVE_USER = "background:#FFFF00"; // Yellow highlight @RequestScoped static class CSQInfo { @@ -208,6 +209,7 @@ class CapacitySchedulerPage extends RmView { html.table("#userinfo").thead().$class("ui-widget-header").tr().th() .$class("ui-state-default")._("User Name")._().th() .$class("ui-state-default")._("Max Resource")._().th() + .$class("ui-state-default")._("Weight")._().th() .$class("ui-state-default")._("Used Resource")._().th() .$class("ui-state-default")._("Max AM Resource")._().th() .$class("ui-state-default")._("Used AM Resource")._().th() @@ -228,8 +230,11 @@ class CapacitySchedulerPage extends RmView { ResourceInfo amUsed = (resourceUsages.getAmUsed() == null) ? new ResourceInfo(Resources.none()) : resourceUsages.getAmUsed(); - tbody.tr().td(userInfo.getUsername()) + String highlightIfAsking = + userInfo.getIsActive() ? ACTIVE_USER : null; + tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername()) .td(userInfo.getUserResourceLimit().toString()) + .td(String.valueOf(userInfo.getUserWeight())) .td(resourcesUsed.toString()) .td(resourceUsages.getAMLimit().toString()) .td(amUsed.toString()) @@ -397,6 +402,8 @@ class CapacitySchedulerPage extends RmView { _("Used (over capacity)")._(). span().$class("qlegend ui-corner-all ui-state-default"). _("Max Capacity")._(). + span().$class("qlegend ui-corner-all").$style(ACTIVE_USER). + _("Users Requesting Resources")._(). _(); float used = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0d0d78befa9..e4fe03d9cd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -26,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -3380,4 +3382,125 @@ public class TestLeafQueue { cs.stop(); } } + + @Test + public void testUserSpecificUserLimits() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + // Set minimum-user-limit-percent for queue "a" in the configs. + csConf.setUserLimit(a.getQueuePath(), 50); + // Set weight for "user_0" to be 1.5 for the a queue in the configs. + csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath() + + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT, + 1.5f); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(16 * GB, 32)); + // Verify that configs were updated and parsed correctly. + Assert.assertNull(a.getUserWeights().get("user_0")); + a.reinitialize(a, csContext.getClusterResource()); + assertEquals(1.5, a.getUserWeights().get("user_0").floatValue(), 0.0); + + // set maxCapacity + a.setMaxCapacity(1.0f); + + // Set minimum user-limit-percent + a.setUserLimit(50); + a.setUserLimitFactor(2); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Set user_0's weight to 1.5 in the a queue's object. + a.getUser(user_0).setWeight(1.5f); + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_1); // different user + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + // app_0 asks for 3 3-GB containers + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true, + priority, recordFactory))); + + // app_1 asks for 2 1-GB containers + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + + /** + * Start testing... + */ + + // There're two active users + assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); + + // 1 container to user_0. Since queue starts out empty, user limit would + // normally be calculated to be the minumum container size (1024GB). + // However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB + // because 1024 * 1.5 rounded up to container size is 2048GB. + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(4*GB, a.getUsedResources().getMemorySize()); + assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // At this point the queue-wide user limit is 3072GB, but since user_0 has a + // weight of 1.5, its user limit is 5120GB. So, even though user_0 already + // has 4096GB, it is under its user limit, so it gets another container. + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(8*GB, a.getUsedResources().getMemorySize()); + assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + + // Queue-wide user limit at this point is 4069GB and user_0's user limit is + // 6144GB. user_0 has 8192GB. + // Now that user_0 is above its user limit, the next container should go to user_1 + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(9*GB, a.getUsedResources().getMemorySize()); + assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); + + assertEquals(4*GB, + app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize()); + + assertEquals(1*GB, + app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md index 33c8c904f98..9be6652200a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md @@ -136,6 +136,7 @@ Configuration | `yarn.scheduler.capacity..user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle th cluster is. Value is specified as a float. | | `yarn.scheduler.capacity..maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. | | `yarn.scheduler.capacity..maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. | +| `yarn.scheduler.capacity..user-settings..weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. For example, if user A should receive 50% more resources in a queue than users B and C, this property will be set to 1.5 for user A. Users B and C will default to 1.0. | * Running and Pending Application Limits