YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)

(cherry picked from commit 5fb723bb77722d41df6959eee23e1b0cfeb5584e)
This commit is contained in:
Wangda Tan 2017-02-09 10:23:50 -08:00 committed by Eric Payne
parent e6cdf770ca
commit f2d440b3b3
24 changed files with 1304 additions and 579 deletions

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -264,8 +265,9 @@ private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
// Verify whether we already calculated headroom for this user. // Verify whether we already calculated headroom for this user.
if (userLimitResource == null) { if (userLimitResource == null) {
userLimitResource = Resources.clone(tq.leafQueue userLimitResource = Resources.clone(
.getUserLimitPerUser(userName, partitionBasedResource, partition)); tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
Resource amUsed = perUserAMUsed.get(userName); Resource amUsed = perUserAMUsed.get(userName);
if (null == amUsed) { if (null == amUsed) {

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* {@link AbstractUsersManager} tracks users in the system.
*/
@Private
public interface AbstractUsersManager {
/**
* An application has new outstanding requests.
*
* @param user
* application user
* @param applicationId
* activated application
*/
void activateApplication(String user, ApplicationId applicationId);
/**
* An application has no more outstanding requests.
*
* @param user
* application user
* @param applicationId
* deactivated application
*/
void deactivateApplication(String user, ApplicationId applicationId);
/**
* Get number of active users i.e. users with applications which have pending
* resource requests.
*
* @return number of active users
*/
int getNumActiveUsers();
}

View File

@ -36,8 +36,8 @@
* An active user is defined as someone with outstanding resource requests. * An active user is defined as someone with outstanding resource requests.
*/ */
@Private @Private
public class ActiveUsersManager { public class ActiveUsersManager implements AbstractUsersManager {
private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class); private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
private final QueueMetrics metrics; private final QueueMetrics metrics;
@ -45,7 +45,7 @@ public class ActiveUsersManager {
private int activeUsers = 0; private int activeUsers = 0;
private Map<String, Set<ApplicationId>> usersApplications = private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>(); new HashMap<String, Set<ApplicationId>>();
public ActiveUsersManager(QueueMetrics metrics) { public ActiveUsersManager(QueueMetrics metrics) {
this.metrics = metrics; this.metrics = metrics;
} }
@ -57,6 +57,7 @@ public ActiveUsersManager(QueueMetrics metrics) {
* @param applicationId activated application * @param applicationId activated application
*/ */
@Lock({Queue.class, SchedulerApplicationAttempt.class}) @Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public void activateApplication( synchronized public void activateApplication(
String user, ApplicationId applicationId) { String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user); Set<ApplicationId> userApps = usersApplications.get(user);
@ -65,8 +66,10 @@ synchronized public void activateApplication(
usersApplications.put(user, userApps); usersApplications.put(user, userApps);
++activeUsers; ++activeUsers;
metrics.incrActiveUsers(); metrics.incrActiveUsers();
LOG.debug("User " + user + " added to activeUsers, currently: " + if (LOG.isDebugEnabled()) {
activeUsers); LOG.debug("User " + user + " added to activeUsers, currently: "
+ activeUsers);
}
} }
if (userApps.add(applicationId)) { if (userApps.add(applicationId)) {
metrics.activateApp(user); metrics.activateApp(user);
@ -80,6 +83,7 @@ synchronized public void activateApplication(
* @param applicationId deactivated application * @param applicationId deactivated application
*/ */
@Lock({Queue.class, SchedulerApplicationAttempt.class}) @Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public void deactivateApplication( synchronized public void deactivateApplication(
String user, ApplicationId applicationId) { String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user); Set<ApplicationId> userApps = usersApplications.get(user);
@ -91,18 +95,21 @@ synchronized public void deactivateApplication(
usersApplications.remove(user); usersApplications.remove(user);
--activeUsers; --activeUsers;
metrics.decrActiveUsers(); metrics.decrActiveUsers();
LOG.debug("User " + user + " removed from activeUsers, currently: " + if (LOG.isDebugEnabled()) {
activeUsers); LOG.debug("User " + user + " removed from activeUsers, currently: "
+ activeUsers);
}
} }
} }
} }
/** /**
* Get number of active users i.e. users with applications which have pending * Get number of active users i.e. users with applications which have pending
* resource requests. * resource requests.
* @return number of active users * @return number of active users
*/ */
@Lock({Queue.class, SchedulerApplicationAttempt.class}) @Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public int getNumActiveUsers() { synchronized public int getNumActiveUsers() {
return activeUsers; return activeUsers;
} }

View File

@ -66,7 +66,7 @@ public class AppSchedulingInfo {
private final String user; private final String user;
private Queue queue; private Queue queue;
private ActiveUsersManager activeUsersManager; private AbstractUsersManager abstractUsersManager;
// whether accepted/allocated by scheduler // whether accepted/allocated by scheduler
private volatile boolean pending = true; private volatile boolean pending = true;
private ResourceUsage appResourceUsage; private ResourceUsage appResourceUsage;
@ -90,13 +90,13 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext; public final ContainerUpdateContext updateContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId, public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, AbstractUsersManager abstractUsersManager,
long epoch, ResourceUsage appResourceUsage) { long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId; this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId(); this.applicationId = appAttemptId.getApplicationId();
this.queue = queue; this.queue = queue;
this.user = user; this.user = user;
this.activeUsersManager = activeUsersManager; this.abstractUsersManager = abstractUsersManager;
this.containerIdCounter = new AtomicLong( this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT); epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage; this.appResourceUsage = appResourceUsage;
@ -253,7 +253,7 @@ private void updatePendingResources(ResourceRequest lastRequest,
// Activate application. Metrics activation is done here. // Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) { if (lastRequestContainers <= 0) {
schedulerKeys.add(schedulerKey); schedulerKeys.add(schedulerKey);
activeUsersManager.activateApplication(user, applicationId); abstractUsersManager.activateApplication(user, applicationId);
} }
} }
@ -453,7 +453,7 @@ public List<ResourceRequest> allocate(NodeType type,
public void checkForDeactivation() { public void checkForDeactivation() {
if (schedulerKeys.isEmpty()) { if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId); abstractUsersManager.deactivateApplication(user, applicationId);
} }
} }
@ -483,9 +483,9 @@ public void move(Queue newQueue) {
} }
oldMetrics.moveAppFrom(this); oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this); newMetrics.moveAppTo(this);
activeUsersManager.deactivateApplication(user, applicationId); abstractUsersManager.deactivateApplication(user, applicationId);
activeUsersManager = newQueue.getActiveUsersManager(); abstractUsersManager = newQueue.getAbstractUsersManager();
activeUsersManager.activateApplication(user, applicationId); abstractUsersManager.activateApplication(user, applicationId);
this.queue = newQueue; this.queue = newQueue;
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();

View File

@ -63,7 +63,7 @@ public interface Queue {
boolean hasAccess(QueueACL acl, UserGroupInformation user); boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager(); public AbstractUsersManager getAbstractUsersManager();
/** /**
* Recover the state of the queue for a given container. * Recover the state of the queue for a given container.

View File

@ -200,13 +200,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger(); private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) { RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null"); Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext; this.rmContext = rmContext;
this.appSchedulingInfo = this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue, new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch(), attemptResourceUsage); abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue; this.queue = queue;
this.pendingRelease = Collections.newSetFromMap( this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>()); new ConcurrentHashMap<ContainerId, Boolean>());

View File

@ -40,7 +40,7 @@
import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -240,10 +240,10 @@ public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits); ResourceLimits resourceLimits);
/** /**
* Get the {@link ActiveUsersManager} for the queue. * Get the {@link AbstractUsersManager} for the queue.
* @return the <code>ActiveUsersManager</code> for the queue * @return the <code>AbstractUsersManager</code> for the queue
*/ */
public ActiveUsersManager getActiveUsersManager(); public AbstractUsersManager getAbstractUsersManager();
/** /**
* Adds all applications in the queue and its subqueues to the given collection. * Adds all applications in the queue and its subqueues to the given collection.

View File

@ -26,12 +26,12 @@
public class CapacityHeadroomProvider { public class CapacityHeadroomProvider {
LeafQueue.User user; UsersManager.User user;
LeafQueue queue; LeafQueue queue;
FiCaSchedulerApp application; FiCaSchedulerApp application;
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue, public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
FiCaSchedulerApp application, FiCaSchedulerApp application,
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) { LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {

View File

@ -745,7 +745,7 @@ private void addApplicationAttempt(
CSQueue queue = (CSQueue) application.getQueue(); CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(), application.getUser(), queue, queue.getAbstractUsersManager(),
rmContext, application.getPriority(), isAttemptRecovering, rmContext, application.getPriority(), isAttemptRecovering,
activitiesManager); activitiesManager);
if (transferStateFromPreviousAttempt) { if (transferStateFromPreviousAttempt) {

View File

@ -51,7 +51,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@ -63,6 +63,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@ -76,7 +77,6 @@
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException; import java.io.IOException;
@ -101,8 +101,6 @@ public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class); private static final Log LOG = LogFactory.getLog(LeafQueue.class);
private float absoluteUsedCapacity = 0.0f; private float absoluteUsedCapacity = 0.0f;
private volatile int userLimit;
private volatile float userLimitFactor;
protected int maxApplications; protected int maxApplications;
protected volatile int maxApplicationsPerUser; protected volatile int maxApplicationsPerUser;
@ -122,14 +120,12 @@ public class LeafQueue extends AbstractCSQueue {
private volatile float minimumAllocationFactor; private volatile float minimumAllocationFactor;
private Map<String, User> users = new ConcurrentHashMap<>();
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext scheduler; private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager; private final UsersManager usersManager;
// cache last cluster resource to compute actual capacity // cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none(); private Resource lastClusterResource = Resources.none();
@ -141,10 +137,6 @@ public class LeafQueue extends AbstractCSQueue {
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null; private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Summation of consumed ratios for all users in queue
private float totalUserConsumedRatio = 0;
private UsageRatios qUsageRatios;
// record all ignore partition exclusivityRMContainer, this will be used to do // record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on // preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers = private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@ -159,13 +151,12 @@ public LeafQueue(CapacitySchedulerContext cs,
super(cs, queueName, parent, old); super(cs, queueName, parent, old);
this.scheduler = cs; this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics); this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
resourceCalculator);
// One time initialization is enough since it is static ordering policy // One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
qUsageRatios = new UsageRatios();
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath()); + ", fullname=" + getQueuePath());
@ -197,8 +188,8 @@ protected void setupQueueConfigs(Resource clusterResource)
setOrderingPolicy( setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath())); conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath()); usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
userLimitFactor = conf.getUserLimitFactor(getQueuePath()); usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath()); maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) { if (maxApplications < 0) {
@ -212,7 +203,8 @@ protected void setupQueueConfigs(Resource clusterResource)
} }
} }
maxApplicationsPerUser = Math.min(maxApplications, maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications * (userLimit / 100.0f) * userLimitFactor)); (int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
* usersManager.getUserLimitFactor()));
maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent( conf.getMaximumApplicationMasterResourcePerQueuePercent(
@ -271,8 +263,9 @@ protected void setupQueueConfigs(Resource clusterResource)
+ queueCapacities.getAbsoluteMaximumCapacity() + queueCapacities.getAbsoluteMaximumCapacity()
+ " [= 1.0 maximumCapacity undefined, " + " [= 1.0 maximumCapacity undefined, "
+ "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
+ "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]" + "\n" + "userLimit = " + usersManager.getUserLimit()
+ "\n" + "userLimitFactor = " + userLimitFactor + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+ usersManager.getUserLimitFactor()
+ " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = " + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+ maxApplications + maxApplications
+ " [= configuredMaximumSystemApplicationsPerQueue or" + " [= configuredMaximumSystemApplicationsPerQueue or"
@ -336,9 +329,17 @@ public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser; return maxApplicationsPerUser;
} }
/**
*
* @return UsersManager instance.
*/
public UsersManager getUsersManager() {
return usersManager;
}
@Override @Override
public ActiveUsersManager getActiveUsersManager() { public AbstractUsersManager getAbstractUsersManager() {
return activeUsersManager; return usersManager;
} }
@Override @Override
@ -352,7 +353,8 @@ public List<CSQueue> getChildQueues() {
*/ */
@VisibleForTesting @VisibleForTesting
void setUserLimit(int userLimit) { void setUserLimit(int userLimit) {
this.userLimit = userLimit; usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute();
} }
/** /**
@ -361,7 +363,8 @@ void setUserLimit(int userLimit) {
*/ */
@VisibleForTesting @VisibleForTesting
void setUserLimitFactor(float userLimitFactor) { void setUserLimitFactor(float userLimitFactor) {
this.userLimitFactor = userLimitFactor; usersManager.setUserLimitFactor(userLimitFactor);
usersManager.userLimitNeedsRecompute();
} }
@Override @Override
@ -422,12 +425,12 @@ public int getNumActiveApplications(String user) {
@Private @Private
public int getUserLimit() { public int getUserLimit() {
return userLimit; return usersManager.getUserLimit();
} }
@Private @Private
public float getUserLimitFactor() { public float getUserLimitFactor() {
return userLimitFactor; return usersManager.getUserLimitFactor();
} }
@Override @Override
@ -477,44 +480,7 @@ public String toString() {
@VisibleForTesting @VisibleForTesting
public User getUser(String userName) { public User getUser(String userName) {
return users.get(userName); return usersManager.getUser(userName);
}
// Get and add user if absent
private User getUserAndAddIfAbsent(String userName) {
try {
writeLock.lock();
User u = users.get(userName);
if (null == u) {
u = new User();
users.put(userName, u);
}
return u;
} finally {
writeLock.unlock();
}
}
/**
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsers() {
try {
readLock.lock();
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : users.entrySet()) {
User user = entry.getValue();
usersToReturn.add(
new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
user.getActiveApplications(), user.getPendingApplications(),
Resources.clone(user.getConsumedAMResources()),
Resources.clone(user.getUserResourceLimit()),
user.getResourceUsage()));
}
return usersToReturn;
} finally {
readLock.unlock();
}
} }
@Private @Private
@ -575,7 +541,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application,
// TODO, should use getUser, use this method just to avoid UT failure // TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately // which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName); User user = usersManager.getUserAndAddIfAbsent(userName);
// Add the attempt to our data-structures // Add the attempt to our data-structures
addApplicationAttempt(application, user); addApplicationAttempt(application, user);
@ -632,7 +598,7 @@ public void validateSubmitApplication(ApplicationId applicationId,
} }
// Check submission limits for the user on this queue // Check submission limits for the user on this queue
User user = getUserAndAddIfAbsent(userName); User user = usersManager.getUserAndAddIfAbsent(userName);
if (user.getTotalApplications() >= getMaxApplicationsPerUser()) { if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
String msg = "Queue " + getQueuePath() + " already has " + user String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName .getTotalApplications() + " applications from user " + userName
@ -682,19 +648,21 @@ public Resource getUserAMResourceLimitPerPartition(
* the absolute queue capacity (per partition) instead of the max and is * the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit * modified by the userlimit and the userlimit factor as is the userlimit
*/ */
float effectiveUserLimit = Math.max(userLimit / 100.0f, float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1)); 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp( Resource queuePartitionResource = Resources
resourceCalculator, .multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource), labelManager.getResourceByLabel(nodePartition,
queueCapacities.getAbsoluteCapacity(nodePartition), lastClusterResource),
minimumAllocation); queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
Resource userAMLimit = Resources.multiplyAndNormalizeUp( Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource, resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition) queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * userLimitFactor, minimumAllocation); * effectiveUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource, return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ? userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
userAMLimit : userAMLimit :
@ -910,7 +878,7 @@ private void addApplicationAttempt(FiCaSchedulerApp application,
@Override @Override
public void finishApplication(ApplicationId application, String user) { public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager // Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application); usersManager.deactivateApplication(user, application);
appFinished(); appFinished();
@ -932,7 +900,7 @@ private void removeApplicationAttempt(
// TODO, should use getUser, use this method just to avoid UT failure // TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately // which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName); User user = usersManager.getUserAndAddIfAbsent(userName);
String partitionName = application.getAppAMNodePartitionName(); String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application); boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
@ -950,7 +918,7 @@ private void removeApplicationAttempt(
user.finishApplication(wasActive); user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) { if (user.getTotalApplications() == 0) {
users.remove(application.getUser()); usersManager.removeUser(application.getUser());
} }
// Check if we can activate more applications // Check if we can activate more applications
@ -1291,7 +1259,7 @@ protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application, Resource clusterResource, FiCaSchedulerApp application,
String partition) { String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource, return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application.getUser(), clusterResource, user, getResourceLimitForActiveUsers(application.getUser(), clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition); partition);
} }
@ -1365,7 +1333,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
// Compute user limit respect requested labels, // Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also // TODO, need consider headroom respect labels also
Resource userLimit = Resource userLimit =
computeUserLimit(application.getUser(), clusterResource, queueUser, getResourceLimitForActiveUsers(application.getUser(), clusterResource,
nodePartition, schedulingMode); nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource); setQueueResourceLimitsInfo(clusterResource);
@ -1375,11 +1343,11 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
clusterResource, userLimit, nodePartition); clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " + LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
" userLimit=" + userLimit + + userLimit + " queueMaxAvailRes="
" queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
" consumed=" + queueUser.getUsed() + + queueUser.getUsed() + " headroom=" + headroom + " partition="
" headroom=" + headroom); + nodePartition);
} }
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
@ -1407,129 +1375,46 @@ public boolean getRackLocalityFullReset() {
return rackLocalityFullReset; return rackLocalityFullReset;
} }
@Lock(NoLock.class) /**
private Resource computeUserLimit(String userName, *
Resource clusterResource, User user, * @param userName
String nodePartition, SchedulingMode schedulingMode) { * Name of user who has submitted one/more app to given queue.
Resource partitionResource = labelManager.getResourceByLabel(nodePartition, * @param clusterResource
clusterResource); * total cluster resource
* @param nodePartition
// What is our current capacity? * partition name
// * It is equal to the max(required, queue-capacity) if * @param schedulingMode
// we're running below capacity. The 'max' ensures that jobs in queues * scheduling mode
// with miniscule capacity (< 1 slot) make progress * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
// * If we're running over capacity, then its * @return Computed User Limit
// (usedResources + required) (which extra resources we are allocating) */
Resource queueCapacity = public Resource getResourceLimitForActiveUsers(String userName,
Resources.multiplyAndNormalizeUp(resourceCalculator, Resource clusterResource, String nodePartition,
partitionResource, SchedulingMode schedulingMode) {
queueCapacities.getAbsoluteCapacity(nodePartition), return usersManager.getComputedResourceLimitForActiveUsers(userName,
minimumAllocation); clusterResource, nodePartition, schedulingMode);
// Assume we have required resource equals to minimumAllocation, this can
// make sure user limit can continuously increase till queueMaxResource
// reached.
Resource required = minimumAllocation;
// Allow progress for queues with miniscule capacity
queueCapacity =
Resources.max(
resourceCalculator, partitionResource,
queueCapacity,
required);
/* We want to base the userLimit calculation on
* max(queueCapacity, usedResources+required). However, we want
* usedResources to be based on the combined ratios of all the users in the
* queue so we use consumedRatio to calculate such.
* The calculation is dependent on how the resourceCalculator calculates the
* ratio between two Resources. DRF Example: If usedResources is
* greater than queueCapacity and users have the following [mem,cpu] usages:
* User1: [10%,20%] - Dominant resource is 20%
* User2: [30%,10%] - Dominant resource is 30%
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
* larger than 100% but for the purposes of making sure all users are
* getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
minimumAllocation);
Resource currentCapacity =
Resources.lessThan(resourceCalculator, partitionResource, consumed,
queueCapacity) ? queueCapacity : Resources.add(consumed, required);
// Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
final int activeUsers = activeUsersManager.getNumActiveUsers();
// User limit resource is determined by:
// max{currentCapacity / #activeUsers, currentCapacity *
// user-limit-percentage%)
Resource userLimitResource = Resources.max(
resourceCalculator, partitionResource,
Resources.divideAndCeil(
resourceCalculator, currentCapacity, activeUsers),
Resources.divideAndCeil(
resourceCalculator,
Resources.multiplyAndRoundDown(
currentCapacity, userLimit),
100)
);
// User limit is capped by maxUserLimit
// - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
// - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
//
// In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
// partition, its guaranteed resource on that partition is 0. And
// user-limit-factor computation is based on queue's guaranteed capacity. So
// we will not cap user-limit as well as used resource when doing
// IGNORE_PARTITION_EXCLUSIVITY allocation.
Resource maxUserLimit = Resources.none();
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxUserLimit =
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = partitionResource;
}
// Cap final user limit with maxUserLimit
userLimitResource =
Resources.roundUp(
resourceCalculator,
Resources.min(
resourceCalculator, partitionResource,
userLimitResource,
maxUserLimit
),
minimumAllocation);
if (LOG.isDebugEnabled()) {
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
" userLimitPercent=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + consumed +
" user-limit-resource: " + userLimitResource +
" queueCapacity: " + queueCapacity +
" qconsumed: " + queueUsage.getUsed() +
" consumedRatio: " + totalUserConsumedRatio +
" currentCapacity: " + currentCapacity +
" activeUsers: " + activeUsers +
" clusterCapacity: " + clusterResource +
" resourceByLabel: " + partitionResource +
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
" Partition: " + nodePartition
);
}
user.setUserResourceLimit(userLimitResource);
return userLimitResource;
} }
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForAllUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
@Private @Private
protected boolean canAssignToUser(Resource clusterResource, protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application, String userName, Resource limit, FiCaSchedulerApp application,
@ -1600,52 +1485,34 @@ private void updateSchedulerHealthForCompletedContainer(
} }
} }
private float calculateUserUsageRatio(Resource clusterResource, /**
* Recalculate QueueUsage Ratio.
*
* @param clusterResource
* Total Cluster Resource
* @param nodePartition
* Partition
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) { String nodePartition) {
try { try {
writeLock.lock(); writeLock.lock();
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition, ResourceUsage queueResourceUsage = getQueueResourceUsage();
clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : users.entrySet()) {
user = entry.getValue();
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, nodePartition);
}
return consumed;
} finally {
writeLock.unlock();
}
}
private void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
try {
writeLock.lock();
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
if (nodePartition == null) { if (nodePartition == null) {
for (String partition : Sets.union( for (String partition : Sets.union(
queueCapacities.getNodePartitionsSet(), getQueueCapacities().getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) { queueResourceUsage.getNodePartitionsSet())) {
qUsageRatios.setUsageRatio(partition, usersManager.updateUsageRatio(partition, clusterResource);
calculateUserUsageRatio(clusterResource, partition));
} }
} else{ } else {
qUsageRatios.setUsageRatio(nodePartition, usersManager.updateUsageRatio(nodePartition, clusterResource);
calculateUserUsageRatio(clusterResource, nodePartition));
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
private void updateQueueUsageRatio(String nodePartition,
float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
@Override @Override
public void completedContainer(Resource clusterResource, public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
@ -1708,8 +1575,6 @@ void allocateResource(Resource clusterResource,
try { try {
writeLock.lock(); writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition); super.allocateResource(clusterResource, resource, nodePartition);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container // handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1728,16 +1593,9 @@ void allocateResource(Resource clusterResource,
// Update user metrics // Update user metrics
String userName = application.getUser(); String userName = application.getUser();
// TODO, should use getUser, use this method just to avoid UT failure // Increment user's resource usage.
// which is caused by wrong invoking order, will fix UT separately User user = usersManager.updateUserResourceUsage(userName, resource,
User user = getUserAndAddIfAbsent(userName); nodePartition, true);
user.assignContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
// Note this is a bit unconventional since it gets the object and modifies // Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine // it here, rather then using set routine
@ -1746,9 +1604,10 @@ void allocateResource(Resource clusterResource,
userName, application.getHeadroom()); userName, application.getHeadroom());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage LOG.debug(getQueueName() + " user=" + userName + " used="
.getUsed() + " numContainers=" + numContainers + " headroom = " + queueUsage.getUsed(nodePartition) + " numContainers="
+ application.getHeadroom() + " user-resources=" + user.getUsed()); + numContainers + " headroom = " + application.getHeadroom()
+ " user-resources=" + user.getUsed());
} }
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
@ -1761,8 +1620,6 @@ void releaseResource(Resource clusterResource,
try { try {
writeLock.lock(); writeLock.lock();
super.releaseResource(clusterResource, resource, nodePartition); super.releaseResource(clusterResource, resource, nodePartition);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container // handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals( if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1780,13 +1637,8 @@ void releaseResource(Resource clusterResource,
// Update user metrics // Update user metrics
String userName = application.getUser(); String userName = application.getUser();
User user = getUserAndAddIfAbsent(userName); User user = usersManager.updateUserResourceUsage(userName, resource,
user.releaseContainer(resource, nodePartition); nodePartition, false);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
metrics.setAvailableResourcesToUser(nodePartition, metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom()); userName, application.getHeadroom());
@ -1846,6 +1698,10 @@ public void updateClusterResource(Resource clusterResource,
// activate the pending applications if possible // activate the pending applications if possible
activateApplications(); activateApplications();
// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();
// Update application properties // Update application properties
for (FiCaSchedulerApp application : orderingPolicy for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) { .getSchedulableEntities()) {
@ -1861,16 +1717,16 @@ public void updateClusterResource(Resource clusterResource,
@Override @Override
public void incUsedResource(String nodeLabel, Resource resourceToInc, public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel, usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
resourceToInc); nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application); super.incUsedResource(nodeLabel, resourceToInc, application);
} }
@Override @Override
public void decUsedResource(String nodeLabel, Resource resourceToDec, public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) { SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel, usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
resourceToDec); nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application); super.decUsedResource(nodeLabel, resourceToDec, application);
} }
@ -1890,191 +1746,6 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
queueUsage.decAMUsed(nodeLabel, resourceToDec); queueUsage.decAMUsed(nodeLabel, resourceToDec);
} }
/*
* Usage Ratio
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
Float fl = usageRatios.get(label);
if (null == fl) {
fl = new Float(0.0);
}
fl += delta;
usageRatios.put(label, new Float(fl));
} finally {
writeLock.unlock();
}
}
float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, new Float(ratio));
} finally {
writeLock.unlock();
}
}
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
volatile int pendingApplications = 0;
volatile int activeApplications = 0;
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
User() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Nobody uses read-lock now, will add it when necessary
writeLock = lock.writeLock();
}
public ResourceUsage getResourceUsage() {
return userResourceUsage;
}
public float resetAndUpdateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
writeLock.unlock();
}
}
public float updateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
} finally {
writeLock.unlock();
}
}
public Resource getUsed() {
return userResourceUsage.getUsed();
}
public Resource getAllUsed() {
return userResourceUsage.getAllUsed();
}
public Resource getUsed(String label) {
return userResourceUsage.getUsed(label);
}
public int getPendingApplications() {
return pendingApplications;
}
public int getActiveApplications() {
return activeApplications;
}
public Resource getConsumedAMResources() {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}
public void submitApplication() {
try {
writeLock.lock();
++pendingApplications;
} finally {
writeLock.unlock();
}
}
public void activateApplication() {
try {
writeLock.lock();
--pendingApplications;
++activeApplications;
} finally {
writeLock.unlock();
}
}
public void finishApplication(boolean wasActive) {
try {
writeLock.lock();
if (wasActive) {
--activeApplications;
} else{
--pendingApplications;
}
} finally {
writeLock.unlock();
}
}
public void assignContainer(Resource resource, String nodePartition) {
userResourceUsage.incUsed(nodePartition, resource);
}
public void releaseContainer(Resource resource, String nodePartition) {
userResourceUsage.decUsed(nodePartition, resource);
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
}
@Override @Override
public void recoverContainer(Resource clusterResource, public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) { SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
@ -2144,9 +1815,9 @@ public Collection<FiCaSchedulerApp> getAllApplications() {
* excessive preemption. * excessive preemption.
* @return Total pending resource considering user limit * @return Total pending resource considering user limit
*/ */
public Resource getTotalPendingResourcesConsideringUserLimit( public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition, boolean deductReservedFromPending) { Resource clusterResources, String partition,
boolean deductReservedFromPending) {
try { try {
readLock.lock(); readLock.lock();
Map<String, Resource> userNameToHeadroom = Map<String, Resource> userNameToHeadroom =
@ -2157,8 +1828,8 @@ public Resource getTotalPendingResourcesConsideringUserLimit(
if (!userNameToHeadroom.containsKey(userName)) { if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName); User user = getUser(userName);
Resource headroom = Resources.subtract( Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), clusterResources, user, partition, getResourceLimitForActiveUsers(app.getUser(), clusterResources,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition)); user.getUsed(partition));
// Make sure headroom is not negative. // Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none()); headroom = Resources.componentwiseMax(headroom, Resources.none());
@ -2188,16 +1859,6 @@ public Resource getTotalPendingResourcesConsideringUserLimit(
} }
public synchronized Resource getUserLimitPerUser(String userName,
Resource resources, String partition) {
// Check user resource limit
User user = getUser(userName);
return computeUserLimit(userName, resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
@Override @Override
public void collectSchedulerApplications( public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {

View File

@ -884,7 +884,7 @@ public void recoverContainer(Resource clusterResource,
} }
@Override @Override
public ActiveUsersManager getActiveUsersManager() { public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues // Should never be called since all applications are submitted to LeafQueues
return null; return null;
} }

View File

@ -0,0 +1,982 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* {@link UsersManager} tracks users in the system and its respective data
* structures.
*/
@Private
public class UsersManager implements AbstractUsersManager {
private static final Log LOG = LogFactory.getLog(UsersManager.class);
/*
* Member declaration for UsersManager class.
*/
private final LeafQueue lQueue;
private final RMNodeLabelsManager labelManager;
private final ResourceCalculator resourceCalculator;
private final CapacitySchedulerContext scheduler;
private Map<String, User> users = new ConcurrentHashMap<>();
private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage();
private Set<String> activeUsersSet = new HashSet<String>();
private Set<String> nonActiveUsersSet = new HashSet<String>();
// Summation of consumed ratios for all users in queue
private UsageRatios qUsageRatios;
// To detect whether there is a change in user count for every user-limit
// calculation.
private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private volatile int userLimit;
private volatile float userLimitFactor;
private WriteLock writeLock;
private ReadLock readLock;
private final QueueMetrics metrics;
private AtomicInteger activeUsers = new AtomicInteger(0);
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
// Pre-computed list of user-limits.
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
/**
* UsageRatios will store the total used resources ratio across all users of
* the queue.
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
float usage = 0f;
if (usageRatios.containsKey(label)) {
usage = usageRatios.get(label);
}
usage += delta;
usageRatios.put(label, usage);
} finally {
writeLock.unlock();
}
}
private float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, ratio);
} finally {
writeLock.unlock();
}
}
} /* End of UserRatios class */
/**
* User class stores all user related resource usage, application details.
*/
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
String userName = null;
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
private volatile AtomicInteger pendingApplications = new AtomicInteger(0);
private volatile AtomicInteger activeApplications = new AtomicInteger(0);
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
public User(String name) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Nobody uses read-lock now, will add it when necessary
writeLock = lock.writeLock();
this.userName = name;
}
public ResourceUsage getResourceUsage() {
return userResourceUsage;
}
public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
writeLock.unlock();
}
}
public float updateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
} finally {
writeLock.unlock();
}
}
public Resource getUsed() {
return userResourceUsage.getUsed();
}
public Resource getAllUsed() {
return userResourceUsage.getAllUsed();
}
public Resource getUsed(String label) {
return userResourceUsage.getUsed(label);
}
public int getPendingApplications() {
return pendingApplications.get();
}
public int getActiveApplications() {
return activeApplications.get();
}
public Resource getConsumedAMResources() {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}
public void submitApplication() {
pendingApplications.incrementAndGet();
}
public void activateApplication() {
pendingApplications.decrementAndGet();
activeApplications.incrementAndGet();
}
public void finishApplication(boolean wasActive) {
if (wasActive) {
activeApplications.decrementAndGet();
} else {
pendingApplications.decrementAndGet();
}
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
} /* End of User class */
/**
* UsersManager Constructor.
*
* @param metrics
* Queue Metrics
* @param lQueue
* Leaf Queue Object
* @param labelManager
* Label Manager instance
* @param scheduler
* Capacity Scheduler Context
* @param resourceCalculator
* rc
*/
public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
ResourceCalculator resourceCalculator) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.lQueue = lQueue;
this.scheduler = scheduler;
this.labelManager = labelManager;
this.resourceCalculator = resourceCalculator;
this.qUsageRatios = new UsageRatios();
this.metrics = metrics;
this.writeLock = lock.writeLock();
this.readLock = lock.readLock();
}
/**
* Get configured user-limit.
* @return user limit
*/
public int getUserLimit() {
return userLimit;
}
/**
* Set configured user-limit.
* @param userLimit user limit
*/
public void setUserLimit(int userLimit) {
this.userLimit = userLimit;
}
/**
* Get configured user-limit factor.
* @return user-limit factor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
/**
* Set configured user-limit factor.
* @param userLimitFactor User Limit factor.
*/
public void setUserLimitFactor(float userLimitFactor) {
this.userLimitFactor = userLimitFactor;
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
/**
* Force UsersManager to recompute userlimit.
*/
public void userLimitNeedsRecompute() {
// If latestVersionOfUsersState is negative due to overflow, ideally we need
// to reset it. This method is invoked from UsersManager and LeafQueue and
// all is happening within write/readLock. Below logic can help to set 0.
try {
writeLock.lock();
long value = latestVersionOfUsersState.incrementAndGet();
if (value < 0) {
latestVersionOfUsersState.set(0);
}
} finally {
writeLock.unlock();
}
}
/*
* Get all users of queue.
*/
private Map<String, User> getUsers() {
return users;
}
/**
* Get user object for given user name.
*
* @param userName
* User Name
* @return User object
*/
public User getUser(String userName) {
return users.get(userName);
}
/**
* Remove user.
*
* @param userName
* User Name
*/
public void removeUser(String userName) {
try {
writeLock.lock();
this.users.remove(userName);
// Remove user from active/non-active list as well.
activeUsersSet.remove(userName);
nonActiveUsersSet.remove(userName);
} finally {
writeLock.unlock();
}
}
/**
* Get and add user if absent.
*
* @param userName
* User Name
* @return User object
*/
public User getUserAndAddIfAbsent(String userName) {
try {
writeLock.lock();
User u = getUser(userName);
if (null == u) {
u = new User(userName);
addUser(userName, u);
// Add to nonActive list so that resourceUsage could be tracked
if (!nonActiveUsersSet.contains(userName)) {
nonActiveUsersSet.add(userName);
}
}
return u;
} finally {
writeLock.unlock();
}
}
/*
* Add a new user
*/
private void addUser(String userName, User user) {
this.users.put(userName, user);
}
/**
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsersInfo() {
try {
readLock.lock();
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
User user = entry.getValue();
usersToReturn.add(
new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
user.getActiveApplications(), user.getPendingApplications(),
Resources.clone(user.getConsumedAMResources()),
Resources.clone(user.getUserResourceLimit()),
user.getResourceUsage()));
}
return usersToReturn;
} finally {
readLock.unlock();
}
}
/**
* Get computed user-limit for all ACTIVE users in this queue. If cached data
* is invalidated due to resource change, this method also enforce to
* recompute user-limit.
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getComputedResourceLimitForActiveUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedActiveUserLimit
.get(nodePartition);
try {
writeLock.lock();
if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
// recompute
userLimitPerSchedulingMode = reComputeUserLimits(userName,
nodePartition, clusterResource, schedulingMode, true);
// update user count to cache so that we can avoid recompute if no major
// changes.
setLocalVersionOfUsersState(nodePartition, schedulingMode, true);
}
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("userLimit is fetched. userLimit = "
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ schedulingMode + ", partition=" + nodePartition);
}
return userLimitPerSchedulingMode.get(schedulingMode);
}
/**
* Get computed user-limit for all users in this queue. If cached data is
* invalidated due to resource change, this method also enforce to recompute
* user-limit.
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getComputedResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedAllUserLimit
.get(nodePartition);
try {
writeLock.lock();
if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
// recompute
userLimitPerSchedulingMode = reComputeUserLimits(userName,
nodePartition, clusterResource, schedulingMode, false);
// update user count to cache so that we can avoid recompute if no major
// changes.
setLocalVersionOfUsersState(nodePartition, schedulingMode, false);
}
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("userLimit is fetched. userLimit = "
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ schedulingMode + ", partition=" + nodePartition);
}
return userLimitPerSchedulingMode.get(schedulingMode);
}
/*
* Recompute user-limit under following conditions: 1. cached user-limit does
* not exist in local map. 2. Total User count doesn't match with local cached
* version.
*/
private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
String nodePartition, boolean isActive) {
return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
isActive) != latestVersionOfUsersState.get());
}
/*
* Set Local version of user count per label to invalidate cache if needed.
*/
private void setLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
writeLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
.get(nodePartition);
if (null == localVersion) {
localVersion = new HashMap<SchedulingMode, Long>();
localVersionOfUsersState.put(nodePartition, localVersion);
}
localVersion.put(schedulingMode, latestVersionOfUsersState.get());
} finally {
writeLock.unlock();
}
}
/*
* Get Local version of user count per label to invalidate cache if needed.
*/
private long getLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
this.readLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
if (!localVersionOfUsersState.containsKey(nodePartition)) {
return -1;
}
Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
.get(nodePartition);
if (!localVersion.containsKey(schedulingMode)) {
return -1;
}
return localVersion.get(schedulingMode);
} finally {
readLock.unlock();
}
}
private Map<SchedulingMode, Resource> reComputeUserLimits(String userName,
String nodePartition, Resource clusterResource,
SchedulingMode schedulingMode, boolean activeMode) {
// preselect stored map as per active user-limit or all user computation.
Map<String, Map<SchedulingMode, Resource>> computedMap = null;
computedMap = (activeMode)
? preComputedActiveUserLimit
: preComputedAllUserLimit;
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap
.get(nodePartition);
if (userLimitPerSchedulingMode == null) {
userLimitPerSchedulingMode = new ConcurrentHashMap<>();
computedMap.put(nodePartition, userLimitPerSchedulingMode);
}
// compute user-limit per scheduling mode.
Resource computedUserLimit = computeUserLimit(userName, clusterResource,
nodePartition, schedulingMode, activeMode);
// update in local storage
userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
return userLimitPerSchedulingMode;
}
private Resource computeUserLimit(String userName, Resource clusterResource,
String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
/*
* What is our current capacity?
* * It is equal to the max(required, queue-capacity) if we're running
* below capacity. The 'max' ensures that jobs in queues with miniscule
* capacity (< 1 slot) make progress
* * If we're running over capacity, then its (usedResources + required)
* (which extra resources we are allocating)
*/
Resource queueCapacity = Resources.multiplyAndNormalizeUp(
resourceCalculator, partitionResource,
lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
lQueue.getMinimumAllocation());
/*
* Assume we have required resource equals to minimumAllocation, this can
* make sure user limit can continuously increase till queueMaxResource
* reached.
*/
Resource required = lQueue.getMinimumAllocation();
// Allow progress for queues with miniscule capacity
queueCapacity = Resources.max(resourceCalculator, partitionResource,
queueCapacity, required);
/*
* We want to base the userLimit calculation on max(queueCapacity,
* usedResources+required). However, we want usedResources to be based on
* the combined ratios of all the users in the queue so we use consumedRatio
* to calculate such. The calculation is dependent on how the
* resourceCalculator calculates the ratio between two Resources. DRF
* Example: If usedResources is greater than queueCapacity and users have
* the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
* 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
* is then 20+30=50%. Yes, this value can be larger than 100% but for the
* purposes of making sure all users are getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, getUsageRatio(nodePartition),
lQueue.getMinimumAllocation());
Resource currentCapacity = Resources.lessThan(resourceCalculator,
partitionResource, consumed, queueCapacity)
? queueCapacity
: Resources.add(consumed, required);
/*
* Never allow a single user to take more than the queue's configured
* capacity * user-limit-factor. Also, the queue's configured capacity
* should be higher than queue-hard-limit * ulMin
*/
int usersCount = getNumActiveUsers();
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
// For non-activeUser calculation, consider all users count.
if (!activeUser) {
resourceUsed = currentCapacity;
usersCount = users.size();
}
/*
* User limit resource is determined by: max{currentCapacity / #activeUsers,
* currentCapacity * user-limit-percentage%)
*/
Resource userLimitResource = Resources.max(resourceCalculator,
partitionResource,
Resources.divideAndCeil(resourceCalculator, resourceUsed,
usersCount),
Resources.divideAndCeil(resourceCalculator,
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
100));
// User limit is capped by maxUserLimit
// - maxUserLimit = queueCapacity * user-limit-factor
// (RESPECT_PARTITION_EXCLUSIVITY)
// - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
//
// In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
// partition, its guaranteed resource on that partition is 0. And
// user-limit-factor computation is based on queue's guaranteed capacity. So
// we will not cap user-limit as well as used resource when doing
// IGNORE_PARTITION_EXCLUSIVITY allocation.
Resource maxUserLimit = Resources.none();
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
getUserLimitFactor());
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = partitionResource;
}
// Cap final user limit with maxUserLimit
userLimitResource = Resources
.roundUp(resourceCalculator,
Resources.min(resourceCalculator, partitionResource,
userLimitResource, maxUserLimit),
lQueue.getMinimumAllocation());
if (LOG.isDebugEnabled()) {
LOG.debug("User limit computation for " + userName + " in queue "
+ lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
+ " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
+ required + " consumed: " + consumed + " user-limit-resource: "
+ userLimitResource + " queueCapacity: " + queueCapacity
+ " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
+ " currentCapacity: " + currentCapacity + " activeUsers: "
+ usersCount + " clusterCapacity: " + clusterResource
+ " resourceByLabel: " + partitionResource + " usageratio: "
+ getUsageRatio(nodePartition) + " Partition: " + nodePartition);
}
getUser(userName).setUserResourceLimit(userLimitResource);
return userLimitResource;
}
/**
* Update new usage ratio.
*
* @param partition
* Node partition
* @param clusterResource
* Cluster Resource
*/
public void updateUsageRatio(String partition, Resource clusterResource) {
try {
writeLock.lock();
Resource resourceByLabel = labelManager.getResourceByLabel(partition,
clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
user = entry.getValue();
consumed += user.setAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, partition);
}
qUsageRatios.setUsageRatio(partition, consumed);
} finally {
writeLock.unlock();
}
}
/*
* Increment Queue Usage Ratio.
*/
private void incQueueUsageRatio(String nodePartition, float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
@Override
public void activateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps == null) {
userApps = new HashSet<ApplicationId>();
usersApplications.put(user, userApps);
activeUsers.incrementAndGet();
metrics.incrActiveUsers();
// A user is added to active list. Invalidate user-limit cache.
userLimitNeedsRecompute();
updateActiveUsersResourceUsage(user);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " added to activeUsers, currently: "
+ activeUsers);
}
}
if (userApps.add(applicationId)) {
metrics.activateApp(user);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public void deactivateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps != null) {
if (userApps.remove(applicationId)) {
metrics.deactivateApp(user);
}
if (userApps.isEmpty()) {
usersApplications.remove(user);
activeUsers.decrementAndGet();
metrics.decrActiveUsers();
// A user is removed from active list. Invalidate user-limit cache.
userLimitNeedsRecompute();
updateNonActiveUsersResourceUsage(user);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " removed from activeUsers, currently: "
+ activeUsers);
}
}
}
} finally {
this.writeLock.unlock();
}
}
@Override
public int getNumActiveUsers() {
return activeUsers.get();
}
private void updateActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
// If User is moved to active list, moved resource usage from non-active
// to active list.
if (nonActiveUsersSet.contains(userName)) {
nonActiveUsersSet.remove(userName);
activeUsersSet.add(userName);
// Update total resource usage of active and non-active after user
// is moved from non-active to active.
for (String partition : resourceUsage.getNodePartitionsSet()) {
totalResUsageForNonActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition));
totalResUsageForActiveUsers.incUsed(partition,
resourceUsage.getUsed(partition));
}
if (LOG.isDebugEnabled()) {
LOG.debug("User '" + userName
+ "' has become active. Hence move user to active list."
+ "Active users size = " + activeUsersSet.size()
+ "Non-active users size = " + nonActiveUsersSet.size()
+ "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
} finally {
this.writeLock.unlock();
}
}
private void updateNonActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
// If User is moved to non-active list, moved resource usage from
// non-active to active list.
if (activeUsersSet.contains(userName)) {
activeUsersSet.remove(userName);
nonActiveUsersSet.add(userName);
// Update total resource usage of active and non-active after user is
// moved from active to non-active.
for (String partition : resourceUsage.getNodePartitionsSet()) {
totalResUsageForActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition));
totalResUsageForNonActiveUsers.incUsed(partition,
resourceUsage.getUsed(partition));
if (LOG.isDebugEnabled()) {
LOG.debug("User '" + userName
+ "' has become non-active.Hence move user to non-active list."
+ "Active users size = " + activeUsersSet.size()
+ "Non-active users size = " + nonActiveUsersSet.size()
+ "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
}
} finally {
this.writeLock.unlock();
}
}
private ResourceUsage getTotalResourceUsagePerUser(String userName) {
if (nonActiveUsersSet.contains(userName)) {
return totalResUsageForNonActiveUsers;
} else if (activeUsersSet.contains(userName)) {
return totalResUsageForActiveUsers;
} else {
LOG.warn("User '" + userName
+ "' is not present in active/non-active. This is highly unlikely."
+ "We can consider this user in non-active list in this case.");
return totalResUsageForNonActiveUsers;
}
}
/**
* During container allocate/release, ensure that all user specific data
* structures are updated.
*
* @param userName
* Name of the user
* @param resource
* Resource to increment/decrement
* @param nodePartition
* Node label
* @param isAllocate
* Indicate whether to allocate or release resource
* @return user
*/
public User updateUserResourceUsage(String userName, Resource resource,
String nodePartition, boolean isAllocate) {
try {
this.writeLock.lock();
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName);
// New container is allocated. Invalidate user-limit.
updateResourceUsagePerUser(user, resource, nodePartition, isAllocate);
userLimitNeedsRecompute();
// Update usage ratios
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
scheduler.getClusterResource());
incQueueUsageRatio(nodePartition, user.updateUsageRatio(
resourceCalculator, resourceByLabel, nodePartition));
return user;
} finally {
this.writeLock.unlock();
}
}
private void updateResourceUsagePerUser(User user, Resource resource,
String nodePartition, boolean isAllocate) {
ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser(
user.userName);
if (isAllocate) {
user.getResourceUsage().incUsed(nodePartition, resource);
totalResourceUsageForUsers.incUsed(nodePartition, resource);
} else {
user.getResourceUsage().decUsed(nodePartition, resource);
totalResourceUsageForUsers.decUsed(nodePartition, resource);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"User resource is updated." + "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
}

View File

@ -54,7 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
@ -115,24 +115,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) { RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext, this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
Priority.newInstance(0), false); Priority.newInstance(0), false);
} }
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) { RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext, this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
appPriority, isAttemptRecovering, null); appPriority, isAttemptRecovering, null);
} }
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) { ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext); super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); RMApp rmApp = rmContext.getRMApps().get(getApplicationId());

View File

@ -488,7 +488,7 @@ public int getNumActiveApps() {
} }
@Override @Override
public ActiveUsersManager getActiveUsersManager() { public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager; return activeUsersManager;
} }

View File

@ -281,7 +281,7 @@ public void collectSchedulerApplications(
} }
@Override @Override
public ActiveUsersManager getActiveUsersManager() { public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues // Should never be called since all applications are submitted to LeafQueues
return null; return null;
} }

View File

@ -177,7 +177,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
} }
@Override @Override
public ActiveUsersManager getActiveUsersManager() { public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager; return activeUsersManager;
} }

View File

@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
maxApplications = q.getMaxApplications(); maxApplications = q.getMaxApplications();
maxApplicationsPerUser = q.getMaxApplicationsPerUser(); maxApplicationsPerUser = q.getMaxApplicationsPerUser();
userLimit = q.getUserLimit(); userLimit = q.getUserLimit();
users = new UsersInfo(q.getUsers()); users = new UsersInfo(q.getUsersManager().getUsersInfo());
userLimitFactor = q.getUserLimitFactor(); userLimitFactor = q.getUserLimitFactor();
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit()); AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed()); usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -362,9 +363,10 @@ private void mockApplications(String appsConfig) {
queue.getQueueCapacities().getAbsoluteCapacity()); queue.getQueueCapacities().getAbsoluteCapacity());
HashSet<String> users = userMap.get(queue.getQueueName()); HashSet<String> users = userMap.get(queue.getQueueName());
Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size()); Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
for (String user : users) { for (String userName : users) {
when(queue.getUserLimitPerUser(eq(user), any(Resource.class), when(queue.getResourceLimitForAllUsers(eq(userName),
anyString())).thenReturn(userLimit); any(Resource.class), anyString(), any(SchedulingMode.class)))
.thenReturn(userLimit);
} }
} }
} }

View File

@ -73,7 +73,7 @@ public void testMove() {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getActiveUsersManager(), rmContext); user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user); oldMetrics.submitApp(user);
// confirm that containerId is calculated based on epoch. // confirm that containerId is calculated based on epoch.
@ -169,7 +169,7 @@ private Queue createQueue(String name, Queue parent, float capacity) {
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class); Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics); when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo); when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
return queue; return queue;
} }
@ -198,7 +198,7 @@ public void testAppPercentages() throws Exception {
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = SchedulerApplicationAttempt app =
new SchedulerApplicationAttempt(appAttId, user, queue, new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext); queue.getAbstractUsersManager(), rmContext);
// Resource request // Resource request
Resource requestedResource = Resource.newInstance(1536, 2); Resource requestedResource = Resource.newInstance(1536, 2);
@ -211,7 +211,7 @@ public void testAppPercentages() throws Exception {
queue = createQueue("test2", null, 0.5f); queue = createQueue("test2", null, 0.5f);
app = new SchedulerApplicationAttempt(appAttId, user, queue, app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext); queue.getAbstractUsersManager(), rmContext);
app.attemptResourceUsage.incUsed(requestedResource); app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(), assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f); 0.01f);
@ -229,7 +229,7 @@ public void testAppPercentages() throws Exception {
queue = createQueue("test3", null, 0.0f); queue = createQueue("test3", null, 0.0f);
app = new SchedulerApplicationAttempt(appAttId, user, queue, app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext); queue.getAbstractUsersManager(), rmContext);
// Resource request // Resource request
app.attemptResourceUsage.incUsed(requestedResource); app.attemptResourceUsage.incUsed(requestedResource);
@ -255,7 +255,7 @@ public void testAppPercentagesOnswitch() throws Exception {
final String user = "user1"; final String user = "user1";
Queue queue = createQueue("test", null); Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId, SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue, queue.getActiveUsersManager(), rmContext); user, queue, queue.getAbstractUsersManager(), rmContext);
// Resource request // Resource request
Resource requestedResource = Resource.newInstance(1536, 2); Resource requestedResource = Resource.newInstance(1536, 2);
@ -274,7 +274,7 @@ public void testSchedulingOpportunityOverflow() throws Exception {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L); when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt( SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getActiveUsersManager(), rmContext); attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1); Priority priority = Priority.newInstance(1);
SchedulerRequestKey schedulerKey = toSchedulerKey(priority); SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
assertEquals(0, app.getSchedulingOpportunities(schedulerKey)); assertEquals(0, app.getSchedulingOpportunities(schedulerKey));

View File

@ -193,7 +193,7 @@ public void testAMResourceLimit() throws Exception {
clusterResource)); clusterResource));
ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class); ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
assertEquals(Resource.newInstance(8 * GB, 1), assertEquals(Resource.newInstance(8 * GB, 1),
queue.calculateAndGetAMResourceLimit()); queue.calculateAndGetAMResourceLimit());
@ -634,7 +634,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp( FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
appAttemptId_0_0, user_0, queue, appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), spyRMContext); queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0); queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -646,7 +646,7 @@ public void testHeadroom() throws Exception {
// Schedule to compute // Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits( queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); Resource expectedHeadroom = Resources.createResource(5*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom // Submit second application from user_0, check headroom
@ -654,7 +654,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp( FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
appAttemptId_0_1, user_0, queue, appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), spyRMContext); queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0); queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -674,7 +674,7 @@ public void testHeadroom() throws Exception {
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp( FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
appAttemptId_1_0, user_1, queue, appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), spyRMContext); queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1); queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@ -693,6 +693,11 @@ public void testHeadroom() throws Exception {
// Now reduce cluster size and check for the smaller headroom // Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB); clusterResource = Resources.createResource(90*16*GB);
// Any change is cluster resource needs to enforce user-limit recomputation.
// In existing code, LeafQueue#updateClusterResource handled this. However
// here that method was not used.
queue.getUsersManager().userLimitNeedsRecompute();
queue.assignContainers(clusterResource, node_0, new ResourceLimits( queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes

View File

@ -659,7 +659,7 @@ public void testHeadroom() throws Exception {
final ApplicationAttemptId appAttemptId_0_0 = final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0, FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
queue, queue.getActiveUsersManager(), spyRMContext); queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0); queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -671,16 +671,16 @@ public void testHeadroom() throws Exception {
queue.assignContainers(clusterResource, node_0, queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource), new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//head room = queue capacity = 50 % 90% 160 GB //head room = queue capacity = 50 % 90% 160 GB * 0.25 (UL)
Resource expectedHeadroom = Resource expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1); Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom // Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 = final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0, FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
queue, queue.getActiveUsersManager(), spyRMContext); queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0); queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -703,15 +703,16 @@ public void testHeadroom() throws Exception {
assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
//head room for default label + head room for y partition //head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
Resource expectedHeadroomWithReqInY = Resource expectedHeadroomWithReqInY = Resources.add(
Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom); Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
expectedHeadroom);
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
// Submit first application from user_1, check for new headroom // Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 = final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1, FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
queue, queue.getActiveUsersManager(), spyRMContext); queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1); queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>(); List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@ -730,12 +731,12 @@ public void testHeadroom() throws Exception {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
//head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users) //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
expectedHeadroom = expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1); Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
//head room for default label + head room for y partition //head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
expectedHeadroomWithReqInY = expectedHeadroomWithReqInY = Resources.add(
Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1), Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
expectedHeadroom); expectedHeadroom);
assertEquals(expectedHeadroom, app_0_0.getHeadroom()); assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom()); assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom()); assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());

View File

@ -186,7 +186,7 @@ private void checkUserUsedResource(MockRM rm, String queueName,
String userName, String partition, int memory) { String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName); LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
LeafQueue.User user = queue.getUser(userName); UsersManager.User user = queue.getUser(userName);
Assert.assertEquals(memory, Assert.assertEquals(memory,
user.getResourceUsage().getUsed(partition).getMemorySize()); user.getResourceUsage().getUsed(partition).getMemorySize());
} }
@ -243,7 +243,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
LeafQueue queue = LeafQueue queue =
(LeafQueue) ((CapacityScheduler) rm.getResourceScheduler()) (LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
.getQueue("a"); .getQueue("a");
ArrayList<UserInfo> users = queue.getUsers(); ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
for (UserInfo userInfo : users) { for (UserInfo userInfo : users) {
if (userInfo.getUsername().equals("user")) { if (userInfo.getUsername().equals("user")) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed(); ResourceInfo resourcesUsed = userInfo.getResourcesUsed();

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -77,10 +78,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -523,19 +522,22 @@ public void testSingleQueueWithOneUser() throws Exception {
// Users // Users
final String user_0 = "user_0"; final String user_0 = "user_0";
// Active Users Manager
AbstractUsersManager activeUserManager = a.getAbstractUsersManager();
// Submit applications // Submit applications
final ApplicationAttemptId appAttemptId_0 = final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext); activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext); activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
@ -684,7 +686,7 @@ public void testDRFUsageRatioRounding() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 = FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b, new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext); b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0); b.submitApplicationAttempt(app0, user0);
// Setup some nodes // Setup some nodes
@ -748,14 +750,14 @@ public void testDRFUserLimits() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 = FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b, new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext); b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0); b.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId2 = final ApplicationAttemptId appAttemptId2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app2 = FiCaSchedulerApp app2 =
new FiCaSchedulerApp(appAttemptId2, user1, b, new FiCaSchedulerApp(appAttemptId2, user1, b,
b.getActiveUsersManager(), spyRMContext); b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app2, user1); b.submitApplicationAttempt(app2, user1);
// Setup some nodes // Setup some nodes
@ -776,6 +778,7 @@ public void testDRFUserLimits() throws Exception {
Resource clusterResource = Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100); Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes); when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// Setup resource-requests so that one application is memory dominant // Setup resource-requests so that one application is memory dominant
// and other application is vcores dominant // and other application is vcores dominant
@ -799,7 +802,7 @@ public void testDRFUserLimits() throws Exception {
User queueUser1 = b.getUser(user1); User queueUser1 = b.getUser(user1);
assertEquals("There should 2 active users!", 2, b assertEquals("There should 2 active users!", 2, b
.getActiveUsersManager().getNumActiveUsers()); .getAbstractUsersManager().getNumActiveUsers());
// Fill both Nodes as far as we can // Fill both Nodes as far as we can
CSAssignment assign; CSAssignment assign;
do { do {
@ -834,7 +837,7 @@ public void testDRFUserLimits() throws Exception {
/ (numNodes * 100.0f) / (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize() + queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB); / (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
// Add another node and make sure consumedRatio is adjusted // Add another node and make sure consumedRatio is adjusted
// accordingly. // accordingly.
numNodes = 3; numNodes = 3;
@ -848,7 +851,7 @@ public void testDRFUserLimits() throws Exception {
/ (numNodes * 100.0f) / (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize() + queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB); / (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001); assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
} }
@Test @Test
@ -858,6 +861,9 @@ public void testUserLimits() throws Exception {
//unset maxCapacity //unset maxCapacity
a.setMaxCapacity(1.0f); a.setMaxCapacity(1.0f);
when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32));
// Users // Users
final String user_0 = "user_0"; final String user_0 = "user_0";
final String user_1 = "user_1"; final String user_1 = "user_1";
@ -867,14 +873,14 @@ public void testUserLimits() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a, new FiCaSchedulerApp(appAttemptId_1, user_1, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); // different user a.submitApplicationAttempt(app_1, user_1); // different user
// Setup some nodes // Setup some nodes
@ -913,7 +919,7 @@ public void testUserLimits() throws Exception {
a.setUserLimitFactor(2); a.setUserLimitFactor(2);
// There're two active users // There're two active users
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0 // 1 container to user_0
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
@ -948,7 +954,7 @@ public void testUserLimits() throws Exception {
// app_0 doesn't have outstanding resources, there's only one active user. // app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!", assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers()); 1, a.getAbstractUsersManager().getNumActiveUsers());
} }
@ -999,7 +1005,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb, new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
qb.getActiveUsersManager(), spyRMContext); qb.getAbstractUsersManager(), spyRMContext);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>(); Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
apps.put(app_0.getApplicationAttemptId(), app_0); apps.put(app_0.getApplicationAttemptId(), app_0);
qb.submitApplicationAttempt(app_0, user_0); qb.submitApplicationAttempt(app_0, user_0);
@ -1010,7 +1016,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
u0Priority, recordFactory))); u0Priority, recordFactory)));
assertEquals("There should only be 1 active user!", assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers()); 1, qb.getAbstractUsersManager().getNumActiveUsers());
//get headroom //get headroom
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
qb.assignContainers(clusterResource, node_0, qb.assignContainers(clusterResource, node_0,
@ -1027,7 +1033,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb, new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
qb.getActiveUsersManager(), spyRMContext); qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_2.getApplicationAttemptId(), app_2); apps.put(app_2.getApplicationAttemptId(), app_2);
Priority u1Priority = TestUtils.createMockPriority(2); Priority u1Priority = TestUtils.createMockPriority(2);
SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority); SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
@ -1065,13 +1071,13 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb, new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
qb.getActiveUsersManager(), spyRMContext); qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_1.getApplicationAttemptId(), app_1); apps.put(app_1.getApplicationAttemptId(), app_1);
final ApplicationAttemptId appAttemptId_3 = final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0); TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 = FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb, new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
qb.getActiveUsersManager(), spyRMContext); qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_3.getApplicationAttemptId(), app_3); apps.put(app_3.getApplicationAttemptId(), app_3);
app_1.updateResourceRequests(Collections.singletonList( app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
@ -1100,7 +1106,7 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
TestUtils.getMockApplicationAttemptId(4, 0); TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 = FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb, new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
qb.getActiveUsersManager(), spyRMContext); qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_4.getApplicationAttemptId(), app_4); apps.put(app_4.getApplicationAttemptId(), app_4);
qb.submitApplicationAttempt(app_4, user_0); qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList( app_4.updateResourceRequests(Collections.singletonList(
@ -1123,9 +1129,9 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
//testcase3 still active - 2+2+6=10 //testcase3 still active - 2+2+6=10
assertEquals(10*GB, qb.getUsedResources().getMemorySize()); assertEquals(10*GB, qb.getUsedResources().getMemorySize());
//app4 is user 0 //app4 is user 0
//maxqueue 16G, userlimit 13G, used 8G, headroom 5G //maxqueue 16G, userlimit 7G, used 8G, headroom 5G
//(8G used is 6G from this test case - app4, 2 from last test case, app_1) //(8G used is 6G from this test case - app4, 2 from last test case, app_1)
assertEquals(5*GB, app_4.getHeadroom().getMemorySize()); assertEquals(0*GB, app_4.getHeadroom().getMemorySize());
} }
@Test @Test
@ -1144,21 +1150,21 @@ public void testUserHeadroomMultiApp() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes // Setup some nodes
@ -1244,21 +1250,21 @@ public void testHeadroomWithMaxCap() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes // Setup some nodes
@ -1298,7 +1304,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// Now, only user_0 should be active since he is the only one with // Now, only user_0 should be active since he is the only one with
// outstanding requests // outstanding requests
assertEquals("There should only be 1 active user!", assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers()); 1, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0 // 1 container to user_0
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
@ -1309,8 +1315,8 @@ public void testHeadroomWithMaxCap() throws Exception {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
// TODO, fix headroom in the future patch // TODO, fix headroom in the future patch
assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
// User limit = 4G, 2 in use // User limit = 2G, 2 in use
assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
// the application is not yet active // the application is not yet active
@ -1322,15 +1328,15 @@ public void testHeadroomWithMaxCap() throws Exception {
assertEquals(3*GB, a.getUsedResources().getMemorySize()); assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
// Submit requests for app_1 and set max-cap // Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f); a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList( app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory))); priority, recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit // No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap // and no more containers to queue since it's already at max-cap
@ -1349,7 +1355,7 @@ public void testHeadroomWithMaxCap() throws Exception {
app_1.updateResourceRequests(Collections.singletonList( // unset app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true, TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory))); priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers());
applyCSAssignment(clusterResource, applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_1, a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource), new ResourceLimits(clusterResource),
@ -1375,28 +1381,28 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a, new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a, new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 = final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0); TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 = FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a, new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1); a.submitApplicationAttempt(app_2, user_1);
final ApplicationAttemptId appAttemptId_3 = final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0); TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 = FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a, new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), spyRMContext); a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_3, user_2); a.submitApplicationAttempt(app_3, user_2);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
@ -1414,7 +1420,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
Resource clusterResource = Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16); Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes); when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// Setup resource-requests // Setup resource-requests
Priority priority = TestUtils.createMockPriority(1); Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList( app_0.updateResourceRequests(Collections.singletonList(
@ -1741,6 +1748,7 @@ public void testReservationExchange() throws Exception {
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1));
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of( Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
@ -3804,7 +3812,7 @@ public void testApplicationQueuePercent()
final String user = "user1"; final String user = "user1";
FiCaSchedulerApp app = FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttId, user, queue, new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext); queue.getAbstractUsersManager(), rmContext);
// Resource request // Resource request
Resource requestedResource = Resource.newInstance(1536, 2); Resource requestedResource = Resource.newInstance(1536, 2);
@ -3819,7 +3827,7 @@ public void testApplicationQueuePercent()
// child of root, its absolute capaicty is also 50%. // child of root, its absolute capaicty is also 50%.
queue = createQueue("test2", null, 0.5f, 0.5f); queue = createQueue("test2", null, 0.5f, 0.5f);
app = new FiCaSchedulerApp(appAttId, user, queue, app = new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext); queue.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource); app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster. // In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(), assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@ -3831,7 +3839,7 @@ public void testApplicationQueuePercent()
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%. // Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f); AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
app = new FiCaSchedulerApp(appAttId, user, qChild, app = new FiCaSchedulerApp(appAttId, user, qChild,
qChild.getActiveUsersManager(), rmContext); qChild.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource); app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster. // In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(), assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@ -3855,7 +3863,7 @@ private AbstractCSQueue createQueue(String name, Queue parent, float capacity,
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics); ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class); AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics); when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager); when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo); when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
QueueCapacities qCaps = mock(QueueCapacities.class); QueueCapacities qCaps = mock(QueueCapacities.class);
when(qCaps.getAbsoluteCapacity((String) any())).thenReturn(absCap); when(qCaps.getAbsoluteCapacity((String) any())).thenReturn(absCap);

View File

@ -1049,7 +1049,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b"); RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// Each application request 5 * 1GB container // Each application request 50 * 1GB container
am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>()); am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats // NM1 do 50 heartbeats
@ -1169,12 +1169,14 @@ public RMNodeLabelsManager createNodeLabelManager() {
csConf.setAccessibleNodeLabels(A, toSet("x")); csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50); csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50); csConf.setMaximumCapacityByLabel(A, "x", 50);
csConf.setUserLimit(A, 200);
final String B = CapacitySchedulerConfiguration.ROOT + ".b"; final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50); csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x")); csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50); csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50); csConf.setMaximumCapacityByLabel(B, "x", 50);
csConf.setUserLimit(B, 200);
// set node -> label // set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of( mgr.addToCluserNodeLabels(ImmutableSet.of(
@ -1207,6 +1209,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
for (int i = 0; i < 50; i++) { for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
} }
@ -1250,7 +1253,7 @@ private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
} }
} }
private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum) private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
throws InterruptedException { throws InterruptedException {
int totalWaitTick = 100; // wait 10 sec at most. int totalWaitTick = 100; // wait 10 sec at most.