diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java index 757f56722ca..5f1af1e8e39 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -264,8 +265,9 @@ private TreeSet calculateIdealAssignedResourcePerApp( // Verify whether we already calculated headroom for this user. if (userLimitResource == null) { - userLimitResource = Resources.clone(tq.leafQueue - .getUserLimitPerUser(userName, partitionBasedResource, partition)); + userLimitResource = Resources.clone( + tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)); Resource amUsed = perUserAMUsed.get(userName); if (null == amUsed) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java new file mode 100644 index 00000000000..4db35845f22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * {@link AbstractUsersManager} tracks users in the system. + */ +@Private +public interface AbstractUsersManager { + /** + * An application has new outstanding requests. + * + * @param user + * application user + * @param applicationId + * activated application + */ + void activateApplication(String user, ApplicationId applicationId); + /** + * An application has no more outstanding requests. + * + * @param user + * application user + * @param applicationId + * deactivated application + */ + void deactivateApplication(String user, ApplicationId applicationId); + + /** + * Get number of active users i.e. users with applications which have pending + * resource requests. + * + * @return number of active users + */ + int getNumActiveUsers(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java index 36e68583857..049f324793d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java @@ -36,8 +36,8 @@ * An active user is defined as someone with outstanding resource requests. */ @Private -public class ActiveUsersManager { - +public class ActiveUsersManager implements AbstractUsersManager { + private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class); private final QueueMetrics metrics; @@ -45,7 +45,7 @@ public class ActiveUsersManager { private int activeUsers = 0; private Map> usersApplications = new HashMap>(); - + public ActiveUsersManager(QueueMetrics metrics) { this.metrics = metrics; } @@ -57,6 +57,7 @@ public ActiveUsersManager(QueueMetrics metrics) { * @param applicationId activated application */ @Lock({Queue.class, SchedulerApplicationAttempt.class}) + @Override synchronized public void activateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -65,8 +66,10 @@ synchronized public void activateApplication( usersApplications.put(user, userApps); ++activeUsers; metrics.incrActiveUsers(); - LOG.debug("User " + user + " added to activeUsers, currently: " + - activeUsers); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " added to activeUsers, currently: " + + activeUsers); + } } if (userApps.add(applicationId)) { metrics.activateApp(user); @@ -80,6 +83,7 @@ synchronized public void activateApplication( * @param applicationId deactivated application */ @Lock({Queue.class, SchedulerApplicationAttempt.class}) + @Override synchronized public void deactivateApplication( String user, ApplicationId applicationId) { Set userApps = usersApplications.get(user); @@ -91,18 +95,21 @@ synchronized public void deactivateApplication( usersApplications.remove(user); --activeUsers; metrics.decrActiveUsers(); - LOG.debug("User " + user + " removed from activeUsers, currently: " + - activeUsers); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " removed from activeUsers, currently: " + + activeUsers); + } } } } - + /** * Get number of active users i.e. users with applications which have pending * resource requests. * @return number of active users */ @Lock({Queue.class, SchedulerApplicationAttempt.class}) + @Override synchronized public int getNumActiveUsers() { return activeUsers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index f0b6e98a9a6..95323432212 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -66,7 +66,7 @@ public class AppSchedulingInfo { private final String user; private Queue queue; - private ActiveUsersManager activeUsersManager; + private AbstractUsersManager abstractUsersManager; // whether accepted/allocated by scheduler private volatile boolean pending = true; private ResourceUsage appResourceUsage; @@ -90,13 +90,13 @@ public class AppSchedulingInfo { public final ContainerUpdateContext updateContext; public AppSchedulingInfo(ApplicationAttemptId appAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, + String user, Queue queue, AbstractUsersManager abstractUsersManager, long epoch, ResourceUsage appResourceUsage) { this.applicationAttemptId = appAttemptId; this.applicationId = appAttemptId.getApplicationId(); this.queue = queue; this.user = user; - this.activeUsersManager = activeUsersManager; + this.abstractUsersManager = abstractUsersManager; this.containerIdCounter = new AtomicLong( epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; @@ -253,7 +253,7 @@ private void updatePendingResources(ResourceRequest lastRequest, // Activate application. Metrics activation is done here. if (lastRequestContainers <= 0) { schedulerKeys.add(schedulerKey); - activeUsersManager.activateApplication(user, applicationId); + abstractUsersManager.activateApplication(user, applicationId); } } @@ -453,7 +453,7 @@ public List allocate(NodeType type, public void checkForDeactivation() { if (schedulerKeys.isEmpty()) { - activeUsersManager.deactivateApplication(user, applicationId); + abstractUsersManager.deactivateApplication(user, applicationId); } } @@ -483,9 +483,9 @@ public void move(Queue newQueue) { } oldMetrics.moveAppFrom(this); newMetrics.moveAppTo(this); - activeUsersManager.deactivateApplication(user, applicationId); - activeUsersManager = newQueue.getActiveUsersManager(); - activeUsersManager.activateApplication(user, applicationId); + abstractUsersManager.deactivateApplication(user, applicationId); + abstractUsersManager = newQueue.getAbstractUsersManager(); + abstractUsersManager.activateApplication(user, applicationId); this.queue = newQueue; } finally { this.writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index ada2a0b1eea..d166e5fc568 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -63,7 +63,7 @@ public interface Queue { boolean hasAccess(QueueACL acl, UserGroupInformation user); - public ActiveUsersManager getActiveUsersManager(); + public AbstractUsersManager getAbstractUsersManager(); /** * Recover the state of the queue for a given container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index ecfea18b99d..e1d714dc8e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -200,13 +200,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger(); public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, + String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { Preconditions.checkNotNull(rmContext, "RMContext should not be null"); this.rmContext = rmContext; this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user, queue, - activeUsersManager, rmContext.getEpoch(), attemptResourceUsage); + abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage); this.queue = queue; this.pendingRelease = Collections.newSetFromMap( new ConcurrentHashMap()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index b878e7274b7..c6726ec6b62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -240,10 +240,10 @@ public void updateClusterResource(Resource clusterResource, ResourceLimits resourceLimits); /** - * Get the {@link ActiveUsersManager} for the queue. - * @return the ActiveUsersManager for the queue + * Get the {@link AbstractUsersManager} for the queue. + * @return the AbstractUsersManager for the queue */ - public ActiveUsersManager getActiveUsersManager(); + public AbstractUsersManager getAbstractUsersManager(); /** * Adds all applications in the queue and its subqueues to the given collection. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java index 5605f18b773..140a2acdbc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -26,12 +26,12 @@ public class CapacityHeadroomProvider { - LeafQueue.User user; + UsersManager.User user; LeafQueue queue; FiCaSchedulerApp application; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; - public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue, + public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue, FiCaSchedulerApp application, LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 8b1f8b4314a..54145dc3922 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -745,7 +745,7 @@ private void addApplicationAttempt( CSQueue queue = (CSQueue) application.getQueue(); FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, - application.getUser(), queue, queue.getActiveUsersManager(), + application.getUser(), queue, queue.getAbstractUsersManager(), rmContext, application.getPriority(), isAttemptRecovering, activitiesManager); if (transferStateFromPreviousAttempt) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 2b1efd6dc6d..c34adbec9c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; @@ -76,7 +77,6 @@ import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import java.io.IOException; @@ -101,8 +101,6 @@ public class LeafQueue extends AbstractCSQueue { private static final Log LOG = LogFactory.getLog(LeafQueue.class); private float absoluteUsedCapacity = 0.0f; - private volatile int userLimit; - private volatile float userLimitFactor; protected int maxApplications; protected volatile int maxApplicationsPerUser; @@ -122,14 +120,12 @@ public class LeafQueue extends AbstractCSQueue { private volatile float minimumAllocationFactor; - private Map users = new ConcurrentHashMap<>(); - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private CapacitySchedulerContext scheduler; - private final ActiveUsersManager activeUsersManager; + private final UsersManager usersManager; // cache last cluster resource to compute actual capacity private Resource lastClusterResource = Resources.none(); @@ -141,10 +137,6 @@ public class LeafQueue extends AbstractCSQueue { private volatile OrderingPolicy orderingPolicy = null; - // Summation of consumed ratios for all users in queue - private float totalUserConsumedRatio = 0; - private UsageRatios qUsageRatios; - // record all ignore partition exclusivityRMContainer, this will be used to do // preemption, key is the partition of the RMContainer allocated on private Map> ignorePartitionExclusivityRMContainers = @@ -159,13 +151,12 @@ public LeafQueue(CapacitySchedulerContext cs, super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.usersManager = new UsersManager(metrics, this, labelManager, scheduler, + resourceCalculator); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); - qUsageRatios = new UsageRatios(); - if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); @@ -197,8 +188,8 @@ protected void setupQueueConfigs(Resource clusterResource) setOrderingPolicy( conf.getAppOrderingPolicy(getQueuePath())); - userLimit = conf.getUserLimit(getQueuePath()); - userLimitFactor = conf.getUserLimitFactor(getQueuePath()); + usersManager.setUserLimit(conf.getUserLimit(getQueuePath())); + usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath())); maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); if (maxApplications < 0) { @@ -212,7 +203,8 @@ protected void setupQueueConfigs(Resource clusterResource) } } maxApplicationsPerUser = Math.min(maxApplications, - (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); + (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) + * usersManager.getUserLimitFactor())); maxAMResourcePerQueuePercent = conf.getMaximumApplicationMasterResourcePerQueuePercent( @@ -271,8 +263,9 @@ protected void setupQueueConfigs(Resource clusterResource) + queueCapacities.getAbsoluteMaximumCapacity() + " [= 1.0 maximumCapacity undefined, " + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" - + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" - + "\n" + "userLimitFactor = " + userLimitFactor + + "\n" + "userLimit = " + usersManager.getUserLimit() + + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = " + + usersManager.getUserLimitFactor() + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + maxApplications + " [= configuredMaximumSystemApplicationsPerQueue or" @@ -336,9 +329,17 @@ public int getMaxApplicationsPerUser() { return maxApplicationsPerUser; } + /** + * + * @return UsersManager instance. + */ + public UsersManager getUsersManager() { + return usersManager; + } + @Override - public ActiveUsersManager getActiveUsersManager() { - return activeUsersManager; + public AbstractUsersManager getAbstractUsersManager() { + return usersManager; } @Override @@ -352,7 +353,8 @@ public List getChildQueues() { */ @VisibleForTesting void setUserLimit(int userLimit) { - this.userLimit = userLimit; + usersManager.setUserLimit(userLimit); + usersManager.userLimitNeedsRecompute(); } /** @@ -361,7 +363,8 @@ void setUserLimit(int userLimit) { */ @VisibleForTesting void setUserLimitFactor(float userLimitFactor) { - this.userLimitFactor = userLimitFactor; + usersManager.setUserLimitFactor(userLimitFactor); + usersManager.userLimitNeedsRecompute(); } @Override @@ -422,12 +425,12 @@ public int getNumActiveApplications(String user) { @Private public int getUserLimit() { - return userLimit; + return usersManager.getUserLimit(); } @Private public float getUserLimitFactor() { - return userLimitFactor; + return usersManager.getUserLimitFactor(); } @Override @@ -477,44 +480,7 @@ public String toString() { @VisibleForTesting public User getUser(String userName) { - return users.get(userName); - } - - // Get and add user if absent - private User getUserAndAddIfAbsent(String userName) { - try { - writeLock.lock(); - User u = users.get(userName); - if (null == u) { - u = new User(); - users.put(userName, u); - } - return u; - } finally { - writeLock.unlock(); - } - } - - /** - * @return an ArrayList of UserInfo objects who are active in this queue - */ - public ArrayList getUsers() { - try { - readLock.lock(); - ArrayList usersToReturn = new ArrayList(); - for (Map.Entry entry : users.entrySet()) { - User user = entry.getValue(); - usersToReturn.add( - new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), - user.getActiveApplications(), user.getPendingApplications(), - Resources.clone(user.getConsumedAMResources()), - Resources.clone(user.getUserResourceLimit()), - user.getResourceUsage())); - } - return usersToReturn; - } finally { - readLock.unlock(); - } + return usersManager.getUser(userName); } @Private @@ -575,7 +541,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); // Add the attempt to our data-structures addApplicationAttempt(application, user); @@ -632,7 +598,7 @@ public void validateSubmitApplication(ApplicationId applicationId, } // Check submission limits for the user on this queue - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { String msg = "Queue " + getQueuePath() + " already has " + user .getTotalApplications() + " applications from user " + userName @@ -682,19 +648,21 @@ public Resource getUserAMResourceLimitPerPartition( * the absolute queue capacity (per partition) instead of the max and is * modified by the userlimit and the userlimit factor as is the userlimit */ - float effectiveUserLimit = Math.max(userLimit / 100.0f, - 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); + float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f, + 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1)); - Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( - resourceCalculator, - labelManager.getResourceByLabel(nodePartition, lastClusterResource), - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); + Resource queuePartitionResource = Resources + .multiplyAndNormalizeUp(resourceCalculator, + labelManager.getResourceByLabel(nodePartition, + lastClusterResource), + queueCapacities.getAbsoluteCapacity(nodePartition), + minimumAllocation); Resource userAMLimit = Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionResource, queueCapacities.getMaxAMResourcePercentage(nodePartition) - * effectiveUserLimit * userLimitFactor, minimumAllocation); + * effectiveUserLimit * usersManager.getUserLimitFactor(), + minimumAllocation); return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? userAMLimit : @@ -910,7 +878,7 @@ private void addApplicationAttempt(FiCaSchedulerApp application, @Override public void finishApplication(ApplicationId application, String user) { // Inform the activeUsersManager - activeUsersManager.deactivateApplication(user, application); + usersManager.deactivateApplication(user, application); appFinished(); @@ -932,7 +900,7 @@ private void removeApplicationAttempt( // TODO, should use getUser, use this method just to avoid UT failure // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); + User user = usersManager.getUserAndAddIfAbsent(userName); String partitionName = application.getAppAMNodePartitionName(); boolean wasActive = orderingPolicy.removeSchedulableEntity(application); @@ -950,7 +918,7 @@ private void removeApplicationAttempt( user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { - users.remove(application.getUser()); + usersManager.removeUser(application.getUser()); } // Check if we can activate more applications @@ -1291,7 +1259,7 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit, Resource clusterResource, FiCaSchedulerApp application, String partition) { return getHeadroom(user, queueCurrentLimit, clusterResource, - computeUserLimit(application.getUser(), clusterResource, user, + getResourceLimitForActiveUsers(application.getUser(), clusterResource, partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition); } @@ -1365,7 +1333,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also Resource userLimit = - computeUserLimit(application.getUser(), clusterResource, queueUser, + getResourceLimitForActiveUsers(application.getUser(), clusterResource, nodePartition, schedulingMode); setQueueResourceLimitsInfo(clusterResource); @@ -1375,11 +1343,11 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, clusterResource, userLimit, nodePartition); if (LOG.isDebugEnabled()) { - LOG.debug("Headroom calculation for user " + user + ": " + - " userLimit=" + userLimit + - " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + - " consumed=" + queueUser.getUsed() + - " headroom=" + headroom); + LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + + userLimit + " queueMaxAvailRes=" + + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" + + queueUser.getUsed() + " headroom=" + headroom + " partition=" + + nodePartition); } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( @@ -1407,129 +1375,46 @@ public boolean getRackLocalityFullReset() { return rackLocalityFullReset; } - @Lock(NoLock.class) - private Resource computeUserLimit(String userName, - Resource clusterResource, User user, - String nodePartition, SchedulingMode schedulingMode) { - Resource partitionResource = labelManager.getResourceByLabel(nodePartition, - clusterResource); - - // What is our current capacity? - // * It is equal to the max(required, queue-capacity) if - // we're running below capacity. The 'max' ensures that jobs in queues - // with miniscule capacity (< 1 slot) make progress - // * If we're running over capacity, then its - // (usedResources + required) (which extra resources we are allocating) - Resource queueCapacity = - Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, - queueCapacities.getAbsoluteCapacity(nodePartition), - minimumAllocation); - - // Assume we have required resource equals to minimumAllocation, this can - // make sure user limit can continuously increase till queueMaxResource - // reached. - Resource required = minimumAllocation; - - // Allow progress for queues with miniscule capacity - queueCapacity = - Resources.max( - resourceCalculator, partitionResource, - queueCapacity, - required); - - - /* We want to base the userLimit calculation on - * max(queueCapacity, usedResources+required). However, we want - * usedResources to be based on the combined ratios of all the users in the - * queue so we use consumedRatio to calculate such. - * The calculation is dependent on how the resourceCalculator calculates the - * ratio between two Resources. DRF Example: If usedResources is - * greater than queueCapacity and users have the following [mem,cpu] usages: - * User1: [10%,20%] - Dominant resource is 20% - * User2: [30%,10%] - Dominant resource is 30% - * Then total consumedRatio is then 20+30=50%. Yes, this value can be - * larger than 100% but for the purposes of making sure all users are - * getting their fair share, it works. - */ - Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, - partitionResource, qUsageRatios.getUsageRatio(nodePartition), - minimumAllocation); - Resource currentCapacity = - Resources.lessThan(resourceCalculator, partitionResource, consumed, - queueCapacity) ? queueCapacity : Resources.add(consumed, required); - // Never allow a single user to take more than the - // queue's configured capacity * user-limit-factor. - // Also, the queue's configured capacity should be higher than - // queue-hard-limit * ulMin - - final int activeUsers = activeUsersManager.getNumActiveUsers(); - - // User limit resource is determined by: - // max{currentCapacity / #activeUsers, currentCapacity * - // user-limit-percentage%) - Resource userLimitResource = Resources.max( - resourceCalculator, partitionResource, - Resources.divideAndCeil( - resourceCalculator, currentCapacity, activeUsers), - Resources.divideAndCeil( - resourceCalculator, - Resources.multiplyAndRoundDown( - currentCapacity, userLimit), - 100) - ); - - // User limit is capped by maxUserLimit - // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY) - // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) - // - // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a - // partition, its guaranteed resource on that partition is 0. And - // user-limit-factor computation is based on queue's guaranteed capacity. So - // we will not cap user-limit as well as used resource when doing - // IGNORE_PARTITION_EXCLUSIVITY allocation. - Resource maxUserLimit = Resources.none(); - if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxUserLimit = - Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor); - } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { - maxUserLimit = partitionResource; - } - - // Cap final user limit with maxUserLimit - userLimitResource = - Resources.roundUp( - resourceCalculator, - Resources.min( - resourceCalculator, partitionResource, - userLimitResource, - maxUserLimit - ), - minimumAllocation); - - if (LOG.isDebugEnabled()) { - LOG.debug("User limit computation for " + userName + - " in queue " + getQueueName() + - " userLimitPercent=" + userLimit + - " userLimitFactor=" + userLimitFactor + - " required: " + required + - " consumed: " + consumed + - " user-limit-resource: " + userLimitResource + - " queueCapacity: " + queueCapacity + - " qconsumed: " + queueUsage.getUsed() + - " consumedRatio: " + totalUserConsumedRatio + - " currentCapacity: " + currentCapacity + - " activeUsers: " + activeUsers + - " clusterCapacity: " + clusterResource + - " resourceByLabel: " + partitionResource + - " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) + - " Partition: " + nodePartition - ); - } - user.setUserResourceLimit(userLimitResource); - return userLimitResource; + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getResourceLimitForActiveUsers(String userName, + Resource clusterResource, String nodePartition, + SchedulingMode schedulingMode) { + return usersManager.getComputedResourceLimitForActiveUsers(userName, + clusterResource, nodePartition, schedulingMode); } - + + /** + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getResourceLimitForAllUsers(String userName, + Resource clusterResource, String nodePartition, + SchedulingMode schedulingMode) { + return usersManager.getComputedResourceLimitForAllUsers(userName, + clusterResource, nodePartition, schedulingMode); + } + @Private protected boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, @@ -1600,52 +1485,34 @@ private void updateSchedulerHealthForCompletedContainer( } } - private float calculateUserUsageRatio(Resource clusterResource, + /** + * Recalculate QueueUsage Ratio. + * + * @param clusterResource + * Total Cluster Resource + * @param nodePartition + * Partition + */ + public void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) { try { writeLock.lock(); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); - float consumed = 0; - User user; - for (Map.Entry entry : users.entrySet()) { - user = entry.getValue(); - consumed += user.resetAndUpdateUsageRatio(resourceCalculator, - resourceByLabel, nodePartition); - } - return consumed; - } finally { - writeLock.unlock(); - } - } - - private void recalculateQueueUsageRatio(Resource clusterResource, - String nodePartition) { - try { - writeLock.lock(); - ResourceUsage queueResourceUsage = this.getQueueResourceUsage(); + ResourceUsage queueResourceUsage = getQueueResourceUsage(); if (nodePartition == null) { for (String partition : Sets.union( - queueCapacities.getNodePartitionsSet(), + getQueueCapacities().getNodePartitionsSet(), queueResourceUsage.getNodePartitionsSet())) { - qUsageRatios.setUsageRatio(partition, - calculateUserUsageRatio(clusterResource, partition)); + usersManager.updateUsageRatio(partition, clusterResource); } - } else{ - qUsageRatios.setUsageRatio(nodePartition, - calculateUserUsageRatio(clusterResource, nodePartition)); + } else { + usersManager.updateUsageRatio(nodePartition, clusterResource); } } finally { writeLock.unlock(); } } - private void updateQueueUsageRatio(String nodePartition, - float delta) { - qUsageRatios.incUsageRatio(nodePartition, delta); - } - @Override public void completedContainer(Resource clusterResource, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, @@ -1708,8 +1575,6 @@ void allocateResource(Resource clusterResource, try { writeLock.lock(); super.allocateResource(clusterResource, resource, nodePartition); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1728,16 +1593,9 @@ void allocateResource(Resource clusterResource, // Update user metrics String userName = application.getUser(); - // TODO, should use getUser, use this method just to avoid UT failure - // which is caused by wrong invoking order, will fix UT separately - User user = getUserAndAddIfAbsent(userName); - - user.assignContainer(resource, nodePartition); - - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + // Increment user's resource usage. + User user = usersManager.updateUserResourceUsage(userName, resource, + nodePartition, true); // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine @@ -1746,9 +1604,10 @@ void allocateResource(Resource clusterResource, userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage - .getUsed() + " numContainers=" + numContainers + " headroom = " - + application.getHeadroom() + " user-resources=" + user.getUsed()); + LOG.debug(getQueueName() + " user=" + userName + " used=" + + queueUsage.getUsed(nodePartition) + " numContainers=" + + numContainers + " headroom = " + application.getHeadroom() + + " user-resources=" + user.getUsed()); } } finally { writeLock.unlock(); @@ -1761,8 +1620,6 @@ void releaseResource(Resource clusterResource, try { writeLock.lock(); super.releaseResource(clusterResource, resource, nodePartition); - Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, - clusterResource); // handle ignore exclusivity container if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( @@ -1780,13 +1637,8 @@ void releaseResource(Resource clusterResource, // Update user metrics String userName = application.getUser(); - User user = getUserAndAddIfAbsent(userName); - user.releaseContainer(resource, nodePartition); - - // Update usage ratios - updateQueueUsageRatio(nodePartition, - user.updateUsageRatio(resourceCalculator, resourceByLabel, - nodePartition)); + User user = usersManager.updateUserResourceUsage(userName, resource, + nodePartition, false); metrics.setAvailableResourcesToUser(nodePartition, userName, application.getHeadroom()); @@ -1846,6 +1698,10 @@ public void updateClusterResource(Resource clusterResource, // activate the pending applications if possible activateApplications(); + // In case of any resource change, invalidate recalculateULCount to clear + // the computed user-limit. + usersManager.userLimitNeedsRecompute(); + // Update application properties for (FiCaSchedulerApp application : orderingPolicy .getSchedulableEntities()) { @@ -1861,16 +1717,16 @@ public void updateClusterResource(Resource clusterResource, @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel, - resourceToInc); + usersManager.updateUserResourceUsage(application.getUser(), resourceToInc, + nodeLabel, true); super.incUsedResource(nodeLabel, resourceToInc, application); } @Override public void decUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel, - resourceToDec); + usersManager.updateUserResourceUsage(application.getUser(), resourceToDec, + nodeLabel, false); super.decUsedResource(nodeLabel, resourceToDec, application); } @@ -1890,191 +1746,6 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec, queueUsage.decAMUsed(nodeLabel, resourceToDec); } - /* - * Usage Ratio - */ - static private class UsageRatios { - private Map usageRatios; - private ReadLock readLock; - private WriteLock writeLock; - - public UsageRatios() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - usageRatios = new HashMap(); - } - - private void incUsageRatio(String label, float delta) { - try { - writeLock.lock(); - Float fl = usageRatios.get(label); - if (null == fl) { - fl = new Float(0.0); - } - fl += delta; - usageRatios.put(label, new Float(fl)); - } finally { - writeLock.unlock(); - } - } - - float getUsageRatio(String label) { - try { - readLock.lock(); - Float f = usageRatios.get(label); - if (null == f) { - return 0.0f; - } - return f; - } finally { - readLock.unlock(); - } - } - - private void setUsageRatio(String label, float ratio) { - try { - writeLock.lock(); - usageRatios.put(label, new Float(ratio)); - } finally { - writeLock.unlock(); - } - } - } - - @VisibleForTesting - public float getUsageRatio(String label) { - return qUsageRatios.getUsageRatio(label); - } - - @VisibleForTesting - public static class User { - ResourceUsage userResourceUsage = new ResourceUsage(); - volatile Resource userResourceLimit = Resource.newInstance(0, 0); - volatile int pendingApplications = 0; - volatile int activeApplications = 0; - private UsageRatios userUsageRatios = new UsageRatios(); - private WriteLock writeLock; - - User() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - // Nobody uses read-lock now, will add it when necessary - writeLock = lock.writeLock(); - } - - public ResourceUsage getResourceUsage() { - return userResourceUsage; - } - - public float resetAndUpdateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - userUsageRatios.setUsageRatio(nodePartition, 0); - return updateUsageRatio(resourceCalculator, resource, nodePartition); - } finally { - writeLock.unlock(); - } - } - - public float updateUsageRatio( - ResourceCalculator resourceCalculator, - Resource resource, String nodePartition) { - try { - writeLock.lock(); - float delta; - float newRatio = Resources.ratio(resourceCalculator, - getUsed(nodePartition), resource); - delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); - userUsageRatios.setUsageRatio(nodePartition, newRatio); - return delta; - } finally { - writeLock.unlock(); - } - } - - public Resource getUsed() { - return userResourceUsage.getUsed(); - } - - public Resource getAllUsed() { - return userResourceUsage.getAllUsed(); - } - - public Resource getUsed(String label) { - return userResourceUsage.getUsed(label); - } - - public int getPendingApplications() { - return pendingApplications; - } - - public int getActiveApplications() { - return activeApplications; - } - - public Resource getConsumedAMResources() { - return userResourceUsage.getAMUsed(); - } - - public Resource getConsumedAMResources(String label) { - return userResourceUsage.getAMUsed(label); - } - - public int getTotalApplications() { - return getPendingApplications() + getActiveApplications(); - } - - public void submitApplication() { - try { - writeLock.lock(); - ++pendingApplications; - } finally { - writeLock.unlock(); - } - } - - public void activateApplication() { - try { - writeLock.lock(); - --pendingApplications; - ++activeApplications; - } finally { - writeLock.unlock(); - } - } - - public void finishApplication(boolean wasActive) { - try { - writeLock.lock(); - if (wasActive) { - --activeApplications; - } else{ - --pendingApplications; - } - } finally { - writeLock.unlock(); - } - } - - public void assignContainer(Resource resource, String nodePartition) { - userResourceUsage.incUsed(nodePartition, resource); - } - - public void releaseContainer(Resource resource, String nodePartition) { - userResourceUsage.decUsed(nodePartition, resource); - } - - public Resource getUserResourceLimit() { - return userResourceLimit; - } - - public void setUserResourceLimit(Resource userResourceLimit) { - this.userResourceLimit = userResourceLimit; - } - } - @Override public void recoverContainer(Resource clusterResource, SchedulerApplicationAttempt attempt, RMContainer rmContainer) { @@ -2144,9 +1815,9 @@ public Collection getAllApplications() { * excessive preemption. * @return Total pending resource considering user limit */ - public Resource getTotalPendingResourcesConsideringUserLimit( - Resource clusterResources, String partition, boolean deductReservedFromPending) { + Resource clusterResources, String partition, + boolean deductReservedFromPending) { try { readLock.lock(); Map userNameToHeadroom = @@ -2157,8 +1828,8 @@ public Resource getTotalPendingResourcesConsideringUserLimit( if (!userNameToHeadroom.containsKey(userName)) { User user = getUser(userName); Resource headroom = Resources.subtract( - computeUserLimit(app.getUser(), clusterResources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + getResourceLimitForActiveUsers(app.getUser(), clusterResources, + partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), user.getUsed(partition)); // Make sure headroom is not negative. headroom = Resources.componentwiseMax(headroom, Resources.none()); @@ -2188,16 +1859,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit( } - public synchronized Resource getUserLimitPerUser(String userName, - Resource resources, String partition) { - - // Check user resource limit - User user = getUser(userName); - - return computeUserLimit(userName, resources, user, partition, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - } - @Override public void collectSchedulerApplications( Collection apps) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 0206211c313..a9ccefcd89d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -884,7 +884,7 @@ public void recoverContainer(Resource clusterResource, } @Override - public ActiveUsersManager getActiveUsersManager() { + public ActiveUsersManager getAbstractUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java new file mode 100644 index 00000000000..05503c6263d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -0,0 +1,982 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * {@link UsersManager} tracks users in the system and its respective data + * structures. + */ +@Private +public class UsersManager implements AbstractUsersManager { + + private static final Log LOG = LogFactory.getLog(UsersManager.class); + + /* + * Member declaration for UsersManager class. + */ + private final LeafQueue lQueue; + private final RMNodeLabelsManager labelManager; + private final ResourceCalculator resourceCalculator; + private final CapacitySchedulerContext scheduler; + private Map users = new ConcurrentHashMap<>(); + + private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage(); + private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage(); + private Set activeUsersSet = new HashSet(); + private Set nonActiveUsersSet = new HashSet(); + + // Summation of consumed ratios for all users in queue + private UsageRatios qUsageRatios; + + // To detect whether there is a change in user count for every user-limit + // calculation. + private AtomicLong latestVersionOfUsersState = new AtomicLong(0); + private Map> localVersionOfActiveUsersState = + new HashMap>(); + private Map> localVersionOfAllUsersState = + new HashMap>(); + + private volatile int userLimit; + private volatile float userLimitFactor; + + private WriteLock writeLock; + private ReadLock readLock; + + private final QueueMetrics metrics; + private AtomicInteger activeUsers = new AtomicInteger(0); + private Map> usersApplications = + new HashMap>(); + + // Pre-computed list of user-limits. + Map> preComputedActiveUserLimit = new ConcurrentHashMap<>(); + Map> preComputedAllUserLimit = new ConcurrentHashMap<>(); + + /** + * UsageRatios will store the total used resources ratio across all users of + * the queue. + */ + static private class UsageRatios { + private Map usageRatios; + private ReadLock readLock; + private WriteLock writeLock; + + public UsageRatios() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + usageRatios = new HashMap(); + } + + private void incUsageRatio(String label, float delta) { + try { + writeLock.lock(); + float usage = 0f; + if (usageRatios.containsKey(label)) { + usage = usageRatios.get(label); + } + usage += delta; + usageRatios.put(label, usage); + } finally { + writeLock.unlock(); + } + } + + private float getUsageRatio(String label) { + try { + readLock.lock(); + Float f = usageRatios.get(label); + if (null == f) { + return 0.0f; + } + return f; + } finally { + readLock.unlock(); + } + } + + private void setUsageRatio(String label, float ratio) { + try { + writeLock.lock(); + usageRatios.put(label, ratio); + } finally { + writeLock.unlock(); + } + } + } /* End of UserRatios class */ + + /** + * User class stores all user related resource usage, application details. + */ + @VisibleForTesting + public static class User { + ResourceUsage userResourceUsage = new ResourceUsage(); + String userName = null; + volatile Resource userResourceLimit = Resource.newInstance(0, 0); + private volatile AtomicInteger pendingApplications = new AtomicInteger(0); + private volatile AtomicInteger activeApplications = new AtomicInteger(0); + + private UsageRatios userUsageRatios = new UsageRatios(); + private WriteLock writeLock; + + public User(String name) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // Nobody uses read-lock now, will add it when necessary + writeLock = lock.writeLock(); + + this.userName = name; + } + + public ResourceUsage getResourceUsage() { + return userResourceUsage; + } + + public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + userUsageRatios.setUsageRatio(nodePartition, 0); + return updateUsageRatio(resourceCalculator, resource, nodePartition); + } finally { + writeLock.unlock(); + } + } + + public float updateUsageRatio(ResourceCalculator resourceCalculator, + Resource resource, String nodePartition) { + try { + writeLock.lock(); + float delta; + float newRatio = Resources.ratio(resourceCalculator, + getUsed(nodePartition), resource); + delta = newRatio - userUsageRatios.getUsageRatio(nodePartition); + userUsageRatios.setUsageRatio(nodePartition, newRatio); + return delta; + } finally { + writeLock.unlock(); + } + } + + public Resource getUsed() { + return userResourceUsage.getUsed(); + } + + public Resource getAllUsed() { + return userResourceUsage.getAllUsed(); + } + + public Resource getUsed(String label) { + return userResourceUsage.getUsed(label); + } + + public int getPendingApplications() { + return pendingApplications.get(); + } + + public int getActiveApplications() { + return activeApplications.get(); + } + + public Resource getConsumedAMResources() { + return userResourceUsage.getAMUsed(); + } + + public Resource getConsumedAMResources(String label) { + return userResourceUsage.getAMUsed(label); + } + + public int getTotalApplications() { + return getPendingApplications() + getActiveApplications(); + } + + public void submitApplication() { + pendingApplications.incrementAndGet(); + } + + public void activateApplication() { + pendingApplications.decrementAndGet(); + activeApplications.incrementAndGet(); + } + + public void finishApplication(boolean wasActive) { + if (wasActive) { + activeApplications.decrementAndGet(); + } else { + pendingApplications.decrementAndGet(); + } + } + + public Resource getUserResourceLimit() { + return userResourceLimit; + } + + public void setUserResourceLimit(Resource userResourceLimit) { + this.userResourceLimit = userResourceLimit; + } + } /* End of User class */ + + /** + * UsersManager Constructor. + * + * @param metrics + * Queue Metrics + * @param lQueue + * Leaf Queue Object + * @param labelManager + * Label Manager instance + * @param scheduler + * Capacity Scheduler Context + * @param resourceCalculator + * rc + */ + public UsersManager(QueueMetrics metrics, LeafQueue lQueue, + RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler, + ResourceCalculator resourceCalculator) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.lQueue = lQueue; + this.scheduler = scheduler; + this.labelManager = labelManager; + this.resourceCalculator = resourceCalculator; + this.qUsageRatios = new UsageRatios(); + this.metrics = metrics; + + this.writeLock = lock.writeLock(); + this.readLock = lock.readLock(); + } + + /** + * Get configured user-limit. + * @return user limit + */ + public int getUserLimit() { + return userLimit; + } + + /** + * Set configured user-limit. + * @param userLimit user limit + */ + public void setUserLimit(int userLimit) { + this.userLimit = userLimit; + } + + /** + * Get configured user-limit factor. + * @return user-limit factor + */ + public float getUserLimitFactor() { + return userLimitFactor; + } + + /** + * Set configured user-limit factor. + * @param userLimitFactor User Limit factor. + */ + public void setUserLimitFactor(float userLimitFactor) { + this.userLimitFactor = userLimitFactor; + } + + @VisibleForTesting + public float getUsageRatio(String label) { + return qUsageRatios.getUsageRatio(label); + } + + /** + * Force UsersManager to recompute userlimit. + */ + public void userLimitNeedsRecompute() { + + // If latestVersionOfUsersState is negative due to overflow, ideally we need + // to reset it. This method is invoked from UsersManager and LeafQueue and + // all is happening within write/readLock. Below logic can help to set 0. + try { + writeLock.lock(); + + long value = latestVersionOfUsersState.incrementAndGet(); + if (value < 0) { + latestVersionOfUsersState.set(0); + } + } finally { + writeLock.unlock(); + } + } + + /* + * Get all users of queue. + */ + private Map getUsers() { + return users; + } + + /** + * Get user object for given user name. + * + * @param userName + * User Name + * @return User object + */ + public User getUser(String userName) { + return users.get(userName); + } + + /** + * Remove user. + * + * @param userName + * User Name + */ + public void removeUser(String userName) { + try { + writeLock.lock(); + this.users.remove(userName); + + // Remove user from active/non-active list as well. + activeUsersSet.remove(userName); + nonActiveUsersSet.remove(userName); + } finally { + writeLock.unlock(); + } + } + + /** + * Get and add user if absent. + * + * @param userName + * User Name + * @return User object + */ + public User getUserAndAddIfAbsent(String userName) { + try { + writeLock.lock(); + User u = getUser(userName); + if (null == u) { + u = new User(userName); + addUser(userName, u); + + // Add to nonActive list so that resourceUsage could be tracked + if (!nonActiveUsersSet.contains(userName)) { + nonActiveUsersSet.add(userName); + } + } + return u; + } finally { + writeLock.unlock(); + } + } + + /* + * Add a new user + */ + private void addUser(String userName, User user) { + this.users.put(userName, user); + } + + /** + * @return an ArrayList of UserInfo objects who are active in this queue + */ + public ArrayList getUsersInfo() { + try { + readLock.lock(); + ArrayList usersToReturn = new ArrayList(); + for (Map.Entry entry : getUsers().entrySet()) { + User user = entry.getValue(); + usersToReturn.add( + new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()), + user.getActiveApplications(), user.getPendingApplications(), + Resources.clone(user.getConsumedAMResources()), + Resources.clone(user.getUserResourceLimit()), + user.getResourceUsage())); + } + return usersToReturn; + } finally { + readLock.unlock(); + } + } + + /** + * Get computed user-limit for all ACTIVE users in this queue. If cached data + * is invalidated due to resource change, this method also enforce to + * recompute user-limit. + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedResourceLimitForActiveUsers(String userName, + Resource clusterResource, String nodePartition, + SchedulingMode schedulingMode) { + + Map userLimitPerSchedulingMode = preComputedActiveUserLimit + .get(nodePartition); + + try { + writeLock.lock(); + if (isRecomputeNeeded(schedulingMode, nodePartition, true)) { + // recompute + userLimitPerSchedulingMode = reComputeUserLimits(userName, + nodePartition, clusterResource, schedulingMode, true); + + // update user count to cache so that we can avoid recompute if no major + // changes. + setLocalVersionOfUsersState(nodePartition, schedulingMode, true); + } + } finally { + writeLock.unlock(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("userLimit is fetched. userLimit = " + + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode=" + + schedulingMode + ", partition=" + nodePartition); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /** + * Get computed user-limit for all users in this queue. If cached data is + * invalidated due to resource change, this method also enforce to recompute + * user-limit. + * + * @param userName + * Name of user who has submitted one/more app to given queue. + * @param clusterResource + * total cluster resource + * @param nodePartition + * partition name + * @param schedulingMode + * scheduling mode + * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY + * @return Computed User Limit + */ + public Resource getComputedResourceLimitForAllUsers(String userName, + Resource clusterResource, String nodePartition, + SchedulingMode schedulingMode) { + + Map userLimitPerSchedulingMode = preComputedAllUserLimit + .get(nodePartition); + + try { + writeLock.lock(); + if (isRecomputeNeeded(schedulingMode, nodePartition, false)) { + // recompute + userLimitPerSchedulingMode = reComputeUserLimits(userName, + nodePartition, clusterResource, schedulingMode, false); + + // update user count to cache so that we can avoid recompute if no major + // changes. + setLocalVersionOfUsersState(nodePartition, schedulingMode, false); + } + } finally { + writeLock.unlock(); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("userLimit is fetched. userLimit = " + + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode=" + + schedulingMode + ", partition=" + nodePartition); + } + + return userLimitPerSchedulingMode.get(schedulingMode); + } + + /* + * Recompute user-limit under following conditions: 1. cached user-limit does + * not exist in local map. 2. Total User count doesn't match with local cached + * version. + */ + private boolean isRecomputeNeeded(SchedulingMode schedulingMode, + String nodePartition, boolean isActive) { + return (getLocalVersionOfUsersState(nodePartition, schedulingMode, + isActive) != latestVersionOfUsersState.get()); + } + + /* + * Set Local version of user count per label to invalidate cache if needed. + */ + private void setLocalVersionOfUsersState(String nodePartition, + SchedulingMode schedulingMode, boolean isActive) { + try { + writeLock.lock(); + Map> localVersionOfUsersState = (isActive) + ? localVersionOfActiveUsersState + : localVersionOfAllUsersState; + + Map localVersion = localVersionOfUsersState + .get(nodePartition); + if (null == localVersion) { + localVersion = new HashMap(); + localVersionOfUsersState.put(nodePartition, localVersion); + } + + localVersion.put(schedulingMode, latestVersionOfUsersState.get()); + } finally { + writeLock.unlock(); + } + } + + /* + * Get Local version of user count per label to invalidate cache if needed. + */ + private long getLocalVersionOfUsersState(String nodePartition, + SchedulingMode schedulingMode, boolean isActive) { + try { + this.readLock.lock(); + Map> localVersionOfUsersState = (isActive) + ? localVersionOfActiveUsersState + : localVersionOfAllUsersState; + + if (!localVersionOfUsersState.containsKey(nodePartition)) { + return -1; + } + + Map localVersion = localVersionOfUsersState + .get(nodePartition); + if (!localVersion.containsKey(schedulingMode)) { + return -1; + } + + return localVersion.get(schedulingMode); + } finally { + readLock.unlock(); + } + } + + private Map reComputeUserLimits(String userName, + String nodePartition, Resource clusterResource, + SchedulingMode schedulingMode, boolean activeMode) { + + // preselect stored map as per active user-limit or all user computation. + Map> computedMap = null; + computedMap = (activeMode) + ? preComputedActiveUserLimit + : preComputedAllUserLimit; + + Map userLimitPerSchedulingMode = computedMap + .get(nodePartition); + + if (userLimitPerSchedulingMode == null) { + userLimitPerSchedulingMode = new ConcurrentHashMap<>(); + computedMap.put(nodePartition, userLimitPerSchedulingMode); + } + + // compute user-limit per scheduling mode. + Resource computedUserLimit = computeUserLimit(userName, clusterResource, + nodePartition, schedulingMode, activeMode); + + // update in local storage + userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit); + + return userLimitPerSchedulingMode; + } + + private Resource computeUserLimit(String userName, Resource clusterResource, + String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { + Resource partitionResource = labelManager.getResourceByLabel(nodePartition, + clusterResource); + + /* + * What is our current capacity? + * * It is equal to the max(required, queue-capacity) if we're running + * below capacity. The 'max' ensures that jobs in queues with miniscule + * capacity (< 1 slot) make progress + * * If we're running over capacity, then its (usedResources + required) + * (which extra resources we are allocating) + */ + Resource queueCapacity = Resources.multiplyAndNormalizeUp( + resourceCalculator, partitionResource, + lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition), + lQueue.getMinimumAllocation()); + + /* + * Assume we have required resource equals to minimumAllocation, this can + * make sure user limit can continuously increase till queueMaxResource + * reached. + */ + Resource required = lQueue.getMinimumAllocation(); + + // Allow progress for queues with miniscule capacity + queueCapacity = Resources.max(resourceCalculator, partitionResource, + queueCapacity, required); + + /* + * We want to base the userLimit calculation on max(queueCapacity, + * usedResources+required). However, we want usedResources to be based on + * the combined ratios of all the users in the queue so we use consumedRatio + * to calculate such. The calculation is dependent on how the + * resourceCalculator calculates the ratio between two Resources. DRF + * Example: If usedResources is greater than queueCapacity and users have + * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is + * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio + * is then 20+30=50%. Yes, this value can be larger than 100% but for the + * purposes of making sure all users are getting their fair share, it works. + */ + Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator, + partitionResource, getUsageRatio(nodePartition), + lQueue.getMinimumAllocation()); + Resource currentCapacity = Resources.lessThan(resourceCalculator, + partitionResource, consumed, queueCapacity) + ? queueCapacity + : Resources.add(consumed, required); + + /* + * Never allow a single user to take more than the queue's configured + * capacity * user-limit-factor. Also, the queue's configured capacity + * should be higher than queue-hard-limit * ulMin + */ + int usersCount = getNumActiveUsers(); + Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition); + + // For non-activeUser calculation, consider all users count. + if (!activeUser) { + resourceUsed = currentCapacity; + usersCount = users.size(); + } + + /* + * User limit resource is determined by: max{currentCapacity / #activeUsers, + * currentCapacity * user-limit-percentage%) + */ + Resource userLimitResource = Resources.max(resourceCalculator, + partitionResource, + Resources.divideAndCeil(resourceCalculator, resourceUsed, + usersCount), + Resources.divideAndCeil(resourceCalculator, + Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()), + 100)); + + // User limit is capped by maxUserLimit + // - maxUserLimit = queueCapacity * user-limit-factor + // (RESPECT_PARTITION_EXCLUSIVITY) + // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY) + // + // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a + // partition, its guaranteed resource on that partition is 0. And + // user-limit-factor computation is based on queue's guaranteed capacity. So + // we will not cap user-limit as well as used resource when doing + // IGNORE_PARTITION_EXCLUSIVITY allocation. + Resource maxUserLimit = Resources.none(); + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, + getUserLimitFactor()); + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { + maxUserLimit = partitionResource; + } + + // Cap final user limit with maxUserLimit + userLimitResource = Resources + .roundUp(resourceCalculator, + Resources.min(resourceCalculator, partitionResource, + userLimitResource, maxUserLimit), + lQueue.getMinimumAllocation()); + + if (LOG.isDebugEnabled()) { + LOG.debug("User limit computation for " + userName + " in queue " + + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit() + + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: " + + required + " consumed: " + consumed + " user-limit-resource: " + + userLimitResource + " queueCapacity: " + queueCapacity + + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed() + + " currentCapacity: " + currentCapacity + " activeUsers: " + + usersCount + " clusterCapacity: " + clusterResource + + " resourceByLabel: " + partitionResource + " usageratio: " + + getUsageRatio(nodePartition) + " Partition: " + nodePartition); + } + getUser(userName).setUserResourceLimit(userLimitResource); + return userLimitResource; + } + + /** + * Update new usage ratio. + * + * @param partition + * Node partition + * @param clusterResource + * Cluster Resource + */ + public void updateUsageRatio(String partition, Resource clusterResource) { + try { + writeLock.lock(); + Resource resourceByLabel = labelManager.getResourceByLabel(partition, + clusterResource); + float consumed = 0; + User user; + for (Map.Entry entry : getUsers().entrySet()) { + user = entry.getValue(); + consumed += user.setAndUpdateUsageRatio(resourceCalculator, + resourceByLabel, partition); + } + + qUsageRatios.setUsageRatio(partition, consumed); + } finally { + writeLock.unlock(); + } + } + + /* + * Increment Queue Usage Ratio. + */ + private void incQueueUsageRatio(String nodePartition, float delta) { + qUsageRatios.incUsageRatio(nodePartition, delta); + } + + @Override + public void activateApplication(String user, ApplicationId applicationId) { + try { + this.writeLock.lock(); + + Set userApps = usersApplications.get(user); + if (userApps == null) { + userApps = new HashSet(); + usersApplications.put(user, userApps); + activeUsers.incrementAndGet(); + metrics.incrActiveUsers(); + + // A user is added to active list. Invalidate user-limit cache. + userLimitNeedsRecompute(); + updateActiveUsersResourceUsage(user); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " added to activeUsers, currently: " + + activeUsers); + } + } + if (userApps.add(applicationId)) { + metrics.activateApp(user); + } + } finally { + this.writeLock.unlock(); + } + } + + @Override + public void deactivateApplication(String user, ApplicationId applicationId) { + try { + this.writeLock.lock(); + + Set userApps = usersApplications.get(user); + if (userApps != null) { + if (userApps.remove(applicationId)) { + metrics.deactivateApp(user); + } + if (userApps.isEmpty()) { + usersApplications.remove(user); + activeUsers.decrementAndGet(); + metrics.decrActiveUsers(); + + // A user is removed from active list. Invalidate user-limit cache. + userLimitNeedsRecompute(); + updateNonActiveUsersResourceUsage(user); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " removed from activeUsers, currently: " + + activeUsers); + } + } + } + } finally { + this.writeLock.unlock(); + } + } + + @Override + public int getNumActiveUsers() { + return activeUsers.get(); + } + + private void updateActiveUsersResourceUsage(String userName) { + try { + this.writeLock.lock(); + + // For UT case: We might need to add the user to users list. + User user = getUserAndAddIfAbsent(userName); + ResourceUsage resourceUsage = user.getResourceUsage(); + // If User is moved to active list, moved resource usage from non-active + // to active list. + if (nonActiveUsersSet.contains(userName)) { + nonActiveUsersSet.remove(userName); + activeUsersSet.add(userName); + + // Update total resource usage of active and non-active after user + // is moved from non-active to active. + for (String partition : resourceUsage.getNodePartitionsSet()) { + totalResUsageForNonActiveUsers.decUsed(partition, + resourceUsage.getUsed(partition)); + totalResUsageForActiveUsers.incUsed(partition, + resourceUsage.getUsed(partition)); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("User '" + userName + + "' has become active. Hence move user to active list." + + "Active users size = " + activeUsersSet.size() + + "Non-active users size = " + nonActiveUsersSet.size() + + "Total Resource usage for active users=" + + totalResUsageForActiveUsers.getAllUsed() + "." + + "Total Resource usage for non-active users=" + + totalResUsageForNonActiveUsers.getAllUsed()); + } + } + } finally { + this.writeLock.unlock(); + } + } + + private void updateNonActiveUsersResourceUsage(String userName) { + try { + this.writeLock.lock(); + + // For UT case: We might need to add the user to users list. + User user = getUserAndAddIfAbsent(userName); + ResourceUsage resourceUsage = user.getResourceUsage(); + // If User is moved to non-active list, moved resource usage from + // non-active to active list. + if (activeUsersSet.contains(userName)) { + activeUsersSet.remove(userName); + nonActiveUsersSet.add(userName); + + // Update total resource usage of active and non-active after user is + // moved from active to non-active. + for (String partition : resourceUsage.getNodePartitionsSet()) { + totalResUsageForActiveUsers.decUsed(partition, + resourceUsage.getUsed(partition)); + totalResUsageForNonActiveUsers.incUsed(partition, + resourceUsage.getUsed(partition)); + + if (LOG.isDebugEnabled()) { + LOG.debug("User '" + userName + + "' has become non-active.Hence move user to non-active list." + + "Active users size = " + activeUsersSet.size() + + "Non-active users size = " + nonActiveUsersSet.size() + + "Total Resource usage for active users=" + + totalResUsageForActiveUsers.getAllUsed() + "." + + "Total Resource usage for non-active users=" + + totalResUsageForNonActiveUsers.getAllUsed()); + } + } + } + } finally { + this.writeLock.unlock(); + } + } + + private ResourceUsage getTotalResourceUsagePerUser(String userName) { + if (nonActiveUsersSet.contains(userName)) { + return totalResUsageForNonActiveUsers; + } else if (activeUsersSet.contains(userName)) { + return totalResUsageForActiveUsers; + } else { + LOG.warn("User '" + userName + + "' is not present in active/non-active. This is highly unlikely." + + "We can consider this user in non-active list in this case."); + return totalResUsageForNonActiveUsers; + } + } + + /** + * During container allocate/release, ensure that all user specific data + * structures are updated. + * + * @param userName + * Name of the user + * @param resource + * Resource to increment/decrement + * @param nodePartition + * Node label + * @param isAllocate + * Indicate whether to allocate or release resource + * @return user + */ + public User updateUserResourceUsage(String userName, Resource resource, + String nodePartition, boolean isAllocate) { + try { + this.writeLock.lock(); + + // TODO, should use getUser, use this method just to avoid UT failure + // which is caused by wrong invoking order, will fix UT separately + User user = getUserAndAddIfAbsent(userName); + + // New container is allocated. Invalidate user-limit. + updateResourceUsagePerUser(user, resource, nodePartition, isAllocate); + + userLimitNeedsRecompute(); + + // Update usage ratios + Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, + scheduler.getClusterResource()); + incQueueUsageRatio(nodePartition, user.updateUsageRatio( + resourceCalculator, resourceByLabel, nodePartition)); + + return user; + } finally { + this.writeLock.unlock(); + } + } + + private void updateResourceUsagePerUser(User user, Resource resource, + String nodePartition, boolean isAllocate) { + ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser( + user.userName); + + if (isAllocate) { + user.getResourceUsage().incUsed(nodePartition, resource); + totalResourceUsageForUsers.incUsed(nodePartition, resource); + } else { + user.getResourceUsage().decUsed(nodePartition, resource); + totalResourceUsageForUsers.decUsed(nodePartition, resource); + } + + if (LOG.isDebugEnabled()) { + LOG.debug( + "User resource is updated." + "Total Resource usage for active users=" + + totalResUsageForActiveUsers.getAllUsed() + "." + + "Total Resource usage for non-active users=" + + totalResUsageForNonActiveUsers.getAllUsed()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 36353985100..ee5de60483d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; @@ -115,24 +115,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { new ConcurrentHashMap<>(); public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, + String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext) { - this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + this(applicationAttemptId, user, queue, abstractUsersManager, rmContext, Priority.newInstance(0), false); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, + String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { - this(applicationAttemptId, user, queue, activeUsersManager, rmContext, + this(applicationAttemptId, user, queue, abstractUsersManager, rmContext, appPriority, isAttemptRecovering, null); } public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, + String user, Queue queue, AbstractUsersManager abstractUsersManager, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, ActivitiesManager activitiesManager) { - super(applicationAttemptId, user, queue, activeUsersManager, rmContext); + super(applicationAttemptId, user, queue, abstractUsersManager, rmContext); RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 0fad8be5de2..855b8f7e97c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -488,7 +488,7 @@ public int getNumActiveApps() { } @Override - public ActiveUsersManager getActiveUsersManager() { + public ActiveUsersManager getAbstractUsersManager() { return activeUsersManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 3bc81ac588f..5b4e4dc9e5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -281,7 +281,7 @@ public void collectSchedulerApplications( } @Override - public ActiveUsersManager getActiveUsersManager() { + public ActiveUsersManager getAbstractUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b90063d2294..61b527a6fd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -177,7 +177,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { } @Override - public ActiveUsersManager getActiveUsersManager() { + public ActiveUsersManager getAbstractUsersManager() { return activeUsersManager; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java index e0ac56fb889..7dcdf582225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java @@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo { maxApplications = q.getMaxApplications(); maxApplicationsPerUser = q.getMaxApplicationsPerUser(); userLimit = q.getUserLimit(); - users = new UsersInfo(q.getUsers()); + users = new UsersInfo(q.getUsersManager().getUsersInfo()); userLimitFactor = q.getUserLimitFactor(); AMResourceLimit = new ResourceInfo(q.getAMResourceLimit()); usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java index 32b2c68163a..dfab3b2dc6d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -362,9 +363,10 @@ private void mockApplications(String appsConfig) { queue.getQueueCapacities().getAbsoluteCapacity()); HashSet users = userMap.get(queue.getQueueName()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); - for (String user : users) { - when(queue.getUserLimitPerUser(eq(user), any(Resource.class), - anyString())).thenReturn(userLimit); + for (String userName : users) { + when(queue.getResourceLimitForAllUsers(eq(userName), + any(Resource.class), anyString(), any(SchedulingMode.class))) + .thenReturn(userLimit); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index ca73b6aa787..fa16effd25f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -73,7 +73,7 @@ public void testMove() { RMContext rmContext = mock(RMContext.class); when(rmContext.getEpoch()).thenReturn(3L); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, - user, oldQueue, oldQueue.getActiveUsersManager(), rmContext); + user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext); oldMetrics.submitApp(user); // confirm that containerId is calculated based on epoch. @@ -169,7 +169,7 @@ private Queue createQueue(String name, Queue parent, float capacity) { ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); Queue queue = mock(Queue.class); when(queue.getMetrics()).thenReturn(metrics); - when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager); when(queue.getQueueInfo(false, false)).thenReturn(queueInfo); return queue; } @@ -198,7 +198,7 @@ public void testAppPercentages() throws Exception { Queue queue = createQueue("test", null); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, user, queue, - queue.getActiveUsersManager(), rmContext); + queue.getAbstractUsersManager(), rmContext); // Resource request Resource requestedResource = Resource.newInstance(1536, 2); @@ -211,7 +211,7 @@ public void testAppPercentages() throws Exception { queue = createQueue("test2", null, 0.5f); app = new SchedulerApplicationAttempt(appAttId, user, queue, - queue.getActiveUsersManager(), rmContext); + queue.getAbstractUsersManager(), rmContext); app.attemptResourceUsage.incUsed(requestedResource); assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(), 0.01f); @@ -229,7 +229,7 @@ public void testAppPercentages() throws Exception { queue = createQueue("test3", null, 0.0f); app = new SchedulerApplicationAttempt(appAttId, user, queue, - queue.getActiveUsersManager(), rmContext); + queue.getAbstractUsersManager(), rmContext); // Resource request app.attemptResourceUsage.incUsed(requestedResource); @@ -255,7 +255,7 @@ public void testAppPercentagesOnswitch() throws Exception { final String user = "user1"; Queue queue = createQueue("test", null); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, - user, queue, queue.getActiveUsersManager(), rmContext); + user, queue, queue.getAbstractUsersManager(), rmContext); // Resource request Resource requestedResource = Resource.newInstance(1536, 2); @@ -274,7 +274,7 @@ public void testSchedulingOpportunityOverflow() throws Exception { RMContext rmContext = mock(RMContext.class); when(rmContext.getEpoch()).thenReturn(3L); SchedulerApplicationAttempt app = new SchedulerApplicationAttempt( - attemptId, "user", queue, queue.getActiveUsersManager(), rmContext); + attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext); Priority priority = Priority.newInstance(1); SchedulerRequestKey schedulerKey = toSchedulerKey(priority); assertEquals(0, app.getSchedulingOpportunities(schedulerKey)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 37b0da8d6d8..e9b1f9d795a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -193,7 +193,7 @@ public void testAMResourceLimit() throws Exception { clusterResource)); ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); - when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager); assertEquals(Resource.newInstance(8 * GB, 1), queue.calculateAndGetAMResourceLimit()); @@ -634,7 +634,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp( appAttemptId_0_0, user_0, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -646,7 +646,7 @@ public void testHeadroom() throws Exception { // Schedule to compute queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); + Resource expectedHeadroom = Resources.createResource(5*16*GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); // Submit second application from user_0, check headroom @@ -654,7 +654,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp( appAttemptId_0_1, user_0, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -674,7 +674,7 @@ public void testHeadroom() throws Exception { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp( appAttemptId_1_0, user_1, queue, - queue.getActiveUsersManager(), spyRMContext); + queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); @@ -693,6 +693,11 @@ public void testHeadroom() throws Exception { // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); + + // Any change is cluster resource needs to enforce user-limit recomputation. + // In existing code, LeafQueue#updateClusterResource handled this. However + // here that method was not used. + queue.getUsersManager().userLimitNeedsRecompute(); queue.assignContainers(clusterResource, node_0, new ResourceLimits( clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java index 547571e3a5b..0aac2ef23da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java @@ -659,7 +659,7 @@ public void testHeadroom() throws Exception { final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0, - queue, queue.getActiveUsersManager(), spyRMContext); + queue, queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_0, user_0); List app_0_0_requests = new ArrayList(); @@ -671,16 +671,16 @@ public void testHeadroom() throws Exception { queue.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - //head room = queue capacity = 50 % 90% 160 GB + //head room = queue capacity = 50 % 90% 160 GB * 0.25 (UL) Resource expectedHeadroom = - Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1); + Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0, - queue, queue.getActiveUsersManager(), spyRMContext); + queue, queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_0_1, user_0); List app_0_1_requests = new ArrayList(); @@ -703,15 +703,16 @@ public void testHeadroom() throws Exception { assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change //head room for default label + head room for y partition //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB - Resource expectedHeadroomWithReqInY = - Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom); + Resource expectedHeadroomWithReqInY = Resources.add( + Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1), + expectedHeadroom); assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1, - queue, queue.getActiveUsersManager(), spyRMContext); + queue, queue.getAbstractUsersManager(), spyRMContext); queue.submitApplicationAttempt(app_1_0, user_1); List app_1_0_requests = new ArrayList(); @@ -730,12 +731,12 @@ public void testHeadroom() throws Exception { SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users) expectedHeadroom = - Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1); + Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1); //head room for default label + head room for y partition //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB - expectedHeadroomWithReqInY = - Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1), - expectedHeadroom); + expectedHeadroomWithReqInY = Resources.add( + Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1), + expectedHeadroom); assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java index 8c850de7115..b4ebd15ccde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java @@ -186,7 +186,7 @@ private void checkUserUsedResource(MockRM rm, String queueName, String userName, String partition, int memory) { CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName); - LeafQueue.User user = queue.getUser(userName); + UsersManager.User user = queue.getUser(userName); Assert.assertEquals(memory, user.getResourceUsage().getUsed(partition).getMemorySize()); } @@ -243,7 +243,7 @@ public RMNodeLabelsManager createNodeLabelManager() { LeafQueue queue = (LeafQueue) ((CapacityScheduler) rm.getResourceScheduler()) .getQueue("a"); - ArrayList users = queue.getUsers(); + ArrayList users = queue.getUsersManager().getUsersInfo(); for (UserInfo userInfo : users) { if (userInfo.getUsername().equals("user")) { ResourceInfo resourcesUsed = userInfo.getResourcesUsed(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index f572ea3c328..7ad4f7de392 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -77,10 +78,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - - +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -523,19 +522,22 @@ public void testSingleQueueWithOneUser() throws Exception { // Users final String user_0 = "user_0"; + // Active Users Manager + AbstractUsersManager activeUserManager = a.getAbstractUsersManager(); + // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - mock(ActiveUsersManager.class), spyRMContext); + activeUserManager, spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - mock(ActiveUsersManager.class), spyRMContext); + activeUserManager, spyRMContext); a.submitApplicationAttempt(app_1, user_0); // same user @@ -684,7 +686,7 @@ public void testDRFUsageRatioRounding() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app0 = new FiCaSchedulerApp(appAttemptId0, user0, b, - b.getActiveUsersManager(), spyRMContext); + b.getAbstractUsersManager(), spyRMContext); b.submitApplicationAttempt(app0, user0); // Setup some nodes @@ -748,14 +750,14 @@ public void testDRFUserLimits() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app0 = new FiCaSchedulerApp(appAttemptId0, user0, b, - b.getActiveUsersManager(), spyRMContext); + b.getAbstractUsersManager(), spyRMContext); b.submitApplicationAttempt(app0, user0); final ApplicationAttemptId appAttemptId2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app2 = new FiCaSchedulerApp(appAttemptId2, user1, b, - b.getActiveUsersManager(), spyRMContext); + b.getAbstractUsersManager(), spyRMContext); b.submitApplicationAttempt(app2, user1); // Setup some nodes @@ -776,6 +778,7 @@ public void testDRFUserLimits() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8 * GB), numNodes * 100); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + when(csContext.getClusterResource()).thenReturn(clusterResource); // Setup resource-requests so that one application is memory dominant // and other application is vcores dominant @@ -799,7 +802,7 @@ public void testDRFUserLimits() throws Exception { User queueUser1 = b.getUser(user1); assertEquals("There should 2 active users!", 2, b - .getActiveUsersManager().getNumActiveUsers()); + .getAbstractUsersManager().getNumActiveUsers()); // Fill both Nodes as far as we can CSAssignment assign; do { @@ -834,7 +837,7 @@ public void testDRFUserLimits() throws Exception { / (numNodes * 100.0f) + queueUser1.getUsed().getMemorySize() / (numNodes * 8.0f * GB); - assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001); // Add another node and make sure consumedRatio is adjusted // accordingly. numNodes = 3; @@ -848,7 +851,7 @@ public void testDRFUserLimits() throws Exception { / (numNodes * 100.0f) + queueUser1.getUsed().getMemorySize() / (numNodes * 8.0f * GB); - assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); + assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001); } @Test @@ -858,6 +861,9 @@ public void testUserLimits() throws Exception { //unset maxCapacity a.setMaxCapacity(1.0f); + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(16 * GB, 32)); + // Users final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -867,14 +873,14 @@ public void testUserLimits() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_1, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_1, user_1); // different user // Setup some nodes @@ -913,7 +919,7 @@ public void testUserLimits() throws Exception { a.setUserLimitFactor(2); // There're two active users - assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); + assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers()); // 1 container to user_0 applyCSAssignment(clusterResource, @@ -948,7 +954,7 @@ public void testUserLimits() throws Exception { // app_0 doesn't have outstanding resources, there's only one active user. assertEquals("There should only be 1 active user!", - 1, a.getActiveUsersManager().getNumActiveUsers()); + 1, a.getAbstractUsersManager().getNumActiveUsers()); } @@ -999,7 +1005,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, qb, - qb.getActiveUsersManager(), spyRMContext); + qb.getAbstractUsersManager(), spyRMContext); Map apps = new HashMap<>(); apps.put(app_0.getApplicationAttemptId(), app_0); qb.submitApplicationAttempt(app_0, user_0); @@ -1010,7 +1016,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { u0Priority, recordFactory))); assertEquals("There should only be 1 active user!", - 1, qb.getActiveUsersManager().getNumActiveUsers()); + 1, qb.getAbstractUsersManager().getNumActiveUsers()); //get headroom applyCSAssignment(clusterResource, qb.assignContainers(clusterResource, node_0, @@ -1027,7 +1033,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, qb, - qb.getActiveUsersManager(), spyRMContext); + qb.getAbstractUsersManager(), spyRMContext); apps.put(app_2.getApplicationAttemptId(), app_2); Priority u1Priority = TestUtils.createMockPriority(2); SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority); @@ -1065,13 +1071,13 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, qb, - qb.getActiveUsersManager(), spyRMContext); + qb.getAbstractUsersManager(), spyRMContext); apps.put(app_1.getApplicationAttemptId(), app_1); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_1, qb, - qb.getActiveUsersManager(), spyRMContext); + qb.getAbstractUsersManager(), spyRMContext); apps.put(app_3.getApplicationAttemptId(), app_3); app_1.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, @@ -1100,7 +1106,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { TestUtils.getMockApplicationAttemptId(4, 0); FiCaSchedulerApp app_4 = new FiCaSchedulerApp(appAttemptId_4, user_0, qb, - qb.getActiveUsersManager(), spyRMContext); + qb.getAbstractUsersManager(), spyRMContext); apps.put(app_4.getApplicationAttemptId(), app_4); qb.submitApplicationAttempt(app_4, user_0); app_4.updateResourceRequests(Collections.singletonList( @@ -1123,9 +1129,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException { //testcase3 still active - 2+2+6=10 assertEquals(10*GB, qb.getUsedResources().getMemorySize()); //app4 is user 0 - //maxqueue 16G, userlimit 13G, used 8G, headroom 5G + //maxqueue 16G, userlimit 7G, used 8G, headroom 5G //(8G used is 6G from this test case - app4, 2 from last test case, app_1) - assertEquals(5*GB, app_4.getHeadroom().getMemorySize()); + assertEquals(0*GB, app_4.getHeadroom().getMemorySize()); } @Test @@ -1144,21 +1150,21 @@ public void testUserHeadroomMultiApp() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_2, user_1); // Setup some nodes @@ -1244,21 +1250,21 @@ public void testHeadroomWithMaxCap() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_2, user_1); // Setup some nodes @@ -1298,7 +1304,7 @@ public void testHeadroomWithMaxCap() throws Exception { // Now, only user_0 should be active since he is the only one with // outstanding requests assertEquals("There should only be 1 active user!", - 1, a.getActiveUsersManager().getNumActiveUsers()); + 1, a.getAbstractUsersManager().getNumActiveUsers()); // 1 container to user_0 applyCSAssignment(clusterResource, @@ -1309,8 +1315,8 @@ public void testHeadroomWithMaxCap() throws Exception { assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); // TODO, fix headroom in the future patch - assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); - // User limit = 4G, 2 in use + assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); + // User limit = 2G, 2 in use assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // the application is not yet active @@ -1322,15 +1328,15 @@ public void testHeadroomWithMaxCap() throws Exception { assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); - assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G - assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G + assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G + assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G // Submit requests for app_1 and set max-cap a.setMaxCapacity(.1f); app_2.updateResourceRequests(Collections.singletonList( TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory))); - assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); + assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers()); // No more to user_0 since he is already over user-limit // and no more containers to queue since it's already at max-cap @@ -1349,7 +1355,7 @@ public void testHeadroomWithMaxCap() throws Exception { app_1.updateResourceRequests(Collections.singletonList( // unset TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, priority, recordFactory))); - assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); + assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers()); applyCSAssignment(clusterResource, a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), @@ -1375,28 +1381,28 @@ public void testSingleQueueWithMultipleUsers() throws Exception { TestUtils.getMockApplicationAttemptId(0, 0); FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_0, user_0); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_1, user_0); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_2, user_1); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); FiCaSchedulerApp app_3 = new FiCaSchedulerApp(appAttemptId_3, user_2, a, - a.getActiveUsersManager(), spyRMContext); + a.getAbstractUsersManager(), spyRMContext); a.submitApplicationAttempt(app_3, user_2); Map apps = ImmutableMap.of( @@ -1414,7 +1420,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - + when(csContext.getClusterResource()).thenReturn(clusterResource); + // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -1741,6 +1748,7 @@ public void testReservationExchange() throws Exception { when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1)); Map apps = ImmutableMap.of( app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), @@ -3804,7 +3812,7 @@ public void testApplicationQueuePercent() final String user = "user1"; FiCaSchedulerApp app = new FiCaSchedulerApp(appAttId, user, queue, - queue.getActiveUsersManager(), rmContext); + queue.getAbstractUsersManager(), rmContext); // Resource request Resource requestedResource = Resource.newInstance(1536, 2); @@ -3819,7 +3827,7 @@ public void testApplicationQueuePercent() // child of root, its absolute capaicty is also 50%. queue = createQueue("test2", null, 0.5f, 0.5f); app = new FiCaSchedulerApp(appAttId, user, queue, - queue.getActiveUsersManager(), rmContext); + queue.getAbstractUsersManager(), rmContext); app.getAppAttemptResourceUsage().incUsed(requestedResource); // In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster. assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(), @@ -3831,7 +3839,7 @@ public void testApplicationQueuePercent() // Therefore, "test2.1" capacity is 50% and absolute capacity is 25%. AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f); app = new FiCaSchedulerApp(appAttId, user, qChild, - qChild.getActiveUsersManager(), rmContext); + qChild.getAbstractUsersManager(), rmContext); app.getAppAttemptResourceUsage().incUsed(requestedResource); // In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster. assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(), @@ -3855,7 +3863,7 @@ private AbstractCSQueue createQueue(String name, Queue parent, float capacity, ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); AbstractCSQueue queue = mock(AbstractCSQueue.class); when(queue.getMetrics()).thenReturn(metrics); - when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); + when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager); when(queue.getQueueInfo(false, false)).thenReturn(queueInfo); QueueCapacities qCaps = mock(QueueCapacities.class); when(qCaps.getAbsoluteCapacity((String) any())).thenReturn(absCap); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 32a90744e8d..740ef336629 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1049,7 +1049,7 @@ public RMNodeLabelsManager createNodeLabelManager() { RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); - // Each application request 5 * 1GB container + // Each application request 50 * 1GB container am1.allocate("*", 1 * GB, 50, new ArrayList()); // NM1 do 50 heartbeats @@ -1169,12 +1169,14 @@ public RMNodeLabelsManager createNodeLabelManager() { csConf.setAccessibleNodeLabels(A, toSet("x")); csConf.setCapacityByLabel(A, "x", 50); csConf.setMaximumCapacityByLabel(A, "x", 50); + csConf.setUserLimit(A, 200); final String B = CapacitySchedulerConfiguration.ROOT + ".b"; csConf.setCapacity(B, 50); csConf.setAccessibleNodeLabels(B, toSet("x")); csConf.setCapacityByLabel(B, "x", 50); csConf.setMaximumCapacityByLabel(B, "x", 50); + csConf.setUserLimit(B, 200); // set node -> label mgr.addToCluserNodeLabels(ImmutableSet.of( @@ -1207,6 +1209,7 @@ public RMNodeLabelsManager createNodeLabelManager() { SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } @@ -1250,7 +1253,7 @@ private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } } - + private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum) throws InterruptedException { int totalWaitTick = 100; // wait 10 sec at most.