YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)

This commit is contained in:
Wangda Tan 2017-02-09 10:23:50 -08:00
parent b6bb99c18a
commit 5fb723bb77
24 changed files with 1303 additions and 583 deletions

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -264,8 +265,9 @@ public class FifoIntraQueuePreemptionPlugin
// Verify whether we already calculated headroom for this user.
if (userLimitResource == null) {
userLimitResource = Resources.clone(tq.leafQueue
.getUserLimitPerUser(userName, partitionBasedResource, partition));
userLimitResource = Resources.clone(
tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
Resource amUsed = perUserAMUsed.get(userName);
if (null == amUsed) {

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
/**
* {@link AbstractUsersManager} tracks users in the system.
*/
@Private
public interface AbstractUsersManager {
/**
* An application has new outstanding requests.
*
* @param user
* application user
* @param applicationId
* activated application
*/
void activateApplication(String user, ApplicationId applicationId);
/**
* An application has no more outstanding requests.
*
* @param user
* application user
* @param applicationId
* deactivated application
*/
void deactivateApplication(String user, ApplicationId applicationId);
/**
* Get number of active users i.e. users with applications which have pending
* resource requests.
*
* @return number of active users
*/
int getNumActiveUsers();
}

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.utils.Lock;
* An active user is defined as someone with outstanding resource requests.
*/
@Private
public class ActiveUsersManager {
public class ActiveUsersManager implements AbstractUsersManager {
private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
private final QueueMetrics metrics;
@ -45,7 +45,7 @@ public class ActiveUsersManager {
private int activeUsers = 0;
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
public ActiveUsersManager(QueueMetrics metrics) {
this.metrics = metrics;
}
@ -57,6 +57,7 @@ public class ActiveUsersManager {
* @param applicationId activated application
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@ -65,8 +66,10 @@ public class ActiveUsersManager {
usersApplications.put(user, userApps);
++activeUsers;
metrics.incrActiveUsers();
LOG.debug("User " + user + " added to activeUsers, currently: " +
activeUsers);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " added to activeUsers, currently: "
+ activeUsers);
}
}
if (userApps.add(applicationId)) {
metrics.activateApp(user);
@ -80,6 +83,7 @@ public class ActiveUsersManager {
* @param applicationId deactivated application
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@ -91,18 +95,21 @@ public class ActiveUsersManager {
usersApplications.remove(user);
--activeUsers;
metrics.decrActiveUsers();
LOG.debug("User " + user + " removed from activeUsers, currently: " +
activeUsers);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " removed from activeUsers, currently: "
+ activeUsers);
}
}
}
}
/**
* Get number of active users i.e. users with applications which have pending
* resource requests.
* @return number of active users
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
@Override
synchronized public int getNumActiveUsers() {
return activeUsers;
}

View File

@ -73,7 +73,7 @@ public class AppSchedulingInfo {
private final String user;
private Queue queue;
private ActiveUsersManager activeUsersManager;
private AbstractUsersManager abstractUsersManager;
// whether accepted/allocated by scheduler
private volatile boolean pending = true;
private ResourceUsage appResourceUsage;
@ -99,13 +99,13 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.user = user;
this.activeUsersManager = activeUsersManager;
this.abstractUsersManager = abstractUsersManager;
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
@ -477,7 +477,7 @@ public class AppSchedulingInfo {
// Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) {
incrementSchedulerKeyReference(schedulerKey);
activeUsersManager.activateApplication(user, applicationId);
abstractUsersManager.activateApplication(user, applicationId);
}
}
@ -735,7 +735,7 @@ public class AppSchedulingInfo {
public void checkForDeactivation() {
if (schedulerKeys.isEmpty()) {
activeUsersManager.deactivateApplication(user, applicationId);
abstractUsersManager.deactivateApplication(user, applicationId);
}
}
@ -763,9 +763,9 @@ public class AppSchedulingInfo {
}
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
activeUsersManager.deactivateApplication(user, applicationId);
activeUsersManager = newQueue.getActiveUsersManager();
activeUsersManager.activateApplication(user, applicationId);
abstractUsersManager.deactivateApplication(user, applicationId);
abstractUsersManager = newQueue.getAbstractUsersManager();
abstractUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
} finally {
this.writeLock.unlock();

View File

@ -63,7 +63,7 @@ public interface Queue {
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
public AbstractUsersManager getAbstractUsersManager();
/**
* Recover the state of the queue for a given container.

View File

@ -197,13 +197,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -262,10 +262,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
ResourceLimits resourceLimits);
/**
* Get the {@link ActiveUsersManager} for the queue.
* @return the <code>ActiveUsersManager</code> for the queue
* Get the {@link AbstractUsersManager} for the queue.
* @return the <code>AbstractUsersManager</code> for the queue
*/
public ActiveUsersManager getActiveUsersManager();
public AbstractUsersManager getAbstractUsersManager();
/**
* Adds all applications in the queue and its subqueues to the given collection.

View File

@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class CapacityHeadroomProvider {
LeafQueue.User user;
UsersManager.User user;
LeafQueue queue;
FiCaSchedulerApp application;
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
FiCaSchedulerApp application,
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {

View File

@ -23,7 +23,6 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@ -751,7 +750,7 @@ public class CapacityScheduler extends
CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(),
application.getUser(), queue, queue.getAbstractUsersManager(),
rmContext, application.getPriority(), isAttemptRecovering,
activitiesManager);
if (transferStateFromPreviousAttempt) {

View File

@ -21,9 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -56,7 +53,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@ -67,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@ -80,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPo
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@ -92,8 +88,6 @@ public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
private volatile int userLimit;
private volatile float userLimitFactor;
protected int maxApplications;
protected volatile int maxApplicationsPerUser;
@ -112,14 +106,12 @@ public class LeafQueue extends AbstractCSQueue {
private volatile float minimumAllocationFactor;
private Map<String, User> users = new ConcurrentHashMap<>();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager;
private final UsersManager usersManager;
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
@ -131,10 +123,6 @@ public class LeafQueue extends AbstractCSQueue {
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
// Summation of consumed ratios for all users in queue
private float totalUserConsumedRatio = 0;
private UsageRatios qUsageRatios;
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@ -149,13 +137,12 @@ public class LeafQueue extends AbstractCSQueue {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics);
this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
resourceCalculator);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
qUsageRatios = new UsageRatios();
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
@ -187,8 +174,8 @@ public class LeafQueue extends AbstractCSQueue {
setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
@ -202,7 +189,8 @@ public class LeafQueue extends AbstractCSQueue {
}
}
maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
* usersManager.getUserLimitFactor()));
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(
@ -260,8 +248,9 @@ public class LeafQueue extends AbstractCSQueue {
+ queueCapacities.getAbsoluteMaximumCapacity()
+ " [= 1.0 maximumCapacity undefined, "
+ "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
+ "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]"
+ "\n" + "userLimitFactor = " + userLimitFactor
+ "\n" + "userLimit = " + usersManager.getUserLimit()
+ " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+ usersManager.getUserLimitFactor()
+ " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+ maxApplications
+ " [= configuredMaximumSystemApplicationsPerQueue or"
@ -322,9 +311,17 @@ public class LeafQueue extends AbstractCSQueue {
return maxApplicationsPerUser;
}
/**
*
* @return UsersManager instance.
*/
public UsersManager getUsersManager() {
return usersManager;
}
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
public AbstractUsersManager getAbstractUsersManager() {
return usersManager;
}
@Override
@ -338,7 +335,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
@VisibleForTesting
void setUserLimit(int userLimit) {
this.userLimit = userLimit;
usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute();
}
/**
@ -347,7 +345,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
@VisibleForTesting
void setUserLimitFactor(float userLimitFactor) {
this.userLimitFactor = userLimitFactor;
usersManager.setUserLimitFactor(userLimitFactor);
usersManager.userLimitNeedsRecompute();
}
@Override
@ -408,12 +407,12 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getUserLimit() {
return userLimit;
return usersManager.getUserLimit();
}
@Private
public float getUserLimitFactor() {
return userLimitFactor;
return usersManager.getUserLimitFactor();
}
@Override
@ -463,44 +462,7 @@ public class LeafQueue extends AbstractCSQueue {
@VisibleForTesting
public User getUser(String userName) {
return users.get(userName);
}
// Get and add user if absent
private User getUserAndAddIfAbsent(String userName) {
try {
writeLock.lock();
User u = users.get(userName);
if (null == u) {
u = new User();
users.put(userName, u);
}
return u;
} finally {
writeLock.unlock();
}
}
/**
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsers() {
try {
readLock.lock();
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : users.entrySet()) {
User user = entry.getValue();
usersToReturn.add(
new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
user.getActiveApplications(), user.getPendingApplications(),
Resources.clone(user.getConsumedAMResources()),
Resources.clone(user.getUserResourceLimit()),
user.getResourceUsage()));
}
return usersToReturn;
} finally {
readLock.unlock();
}
return usersManager.getUser(userName);
}
@Private
@ -561,7 +523,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName);
User user = usersManager.getUserAndAddIfAbsent(userName);
// Add the attempt to our data-structures
addApplicationAttempt(application, user);
@ -618,7 +580,7 @@ public class LeafQueue extends AbstractCSQueue {
}
// Check submission limits for the user on this queue
User user = getUserAndAddIfAbsent(userName);
User user = usersManager.getUserAndAddIfAbsent(userName);
if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName
@ -668,19 +630,21 @@ public class LeafQueue extends AbstractCSQueue {
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
float effectiveUserLimit = Math.max(userLimit / 100.0f,
1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
Resource queuePartitionResource = Resources
.multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(nodePartition,
lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
* effectiveUserLimit * userLimitFactor, minimumAllocation);
* effectiveUserLimit * usersManager.getUserLimitFactor(),
minimumAllocation);
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
userAMLimit :
@ -895,7 +859,7 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
activeUsersManager.deactivateApplication(user, application);
usersManager.deactivateApplication(user, application);
appFinished();
@ -917,7 +881,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName);
User user = usersManager.getUserAndAddIfAbsent(userName);
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
@ -935,7 +899,7 @@ public class LeafQueue extends AbstractCSQueue {
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
users.remove(application.getUser());
usersManager.removeUser(application.getUser());
}
// Check if we can activate more applications
@ -1282,7 +1246,7 @@ public class LeafQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application.getUser(), clusterResource, user,
getResourceLimitForActiveUsers(application.getUser(), clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
}
@ -1356,7 +1320,7 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
Resource userLimit =
computeUserLimit(application.getUser(), clusterResource, queueUser,
getResourceLimitForActiveUsers(application.getUser(), clusterResource,
nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@ -1366,11 +1330,11 @@ public class LeafQueue extends AbstractCSQueue {
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
+ userLimit + " queueMaxAvailRes="
+ cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+ queueUser.getUsed() + " headroom=" + headroom + " partition="
+ nodePartition);
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
@ -1393,129 +1357,46 @@ public class LeafQueue extends AbstractCSQueue {
return rackLocalityFullReset;
}
@Lock(NoLock.class)
private Resource computeUserLimit(String userName,
Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
// with miniscule capacity (< 1 slot) make progress
// * If we're running over capacity, then its
// (usedResources + required) (which extra resources we are allocating)
Resource queueCapacity =
Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource,
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
// Assume we have required resource equals to minimumAllocation, this can
// make sure user limit can continuously increase till queueMaxResource
// reached.
Resource required = minimumAllocation;
// Allow progress for queues with miniscule capacity
queueCapacity =
Resources.max(
resourceCalculator, partitionResource,
queueCapacity,
required);
/* We want to base the userLimit calculation on
* max(queueCapacity, usedResources+required). However, we want
* usedResources to be based on the combined ratios of all the users in the
* queue so we use consumedRatio to calculate such.
* The calculation is dependent on how the resourceCalculator calculates the
* ratio between two Resources. DRF Example: If usedResources is
* greater than queueCapacity and users have the following [mem,cpu] usages:
* User1: [10%,20%] - Dominant resource is 20%
* User2: [30%,10%] - Dominant resource is 30%
* Then total consumedRatio is then 20+30=50%. Yes, this value can be
* larger than 100% but for the purposes of making sure all users are
* getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, qUsageRatios.getUsageRatio(nodePartition),
minimumAllocation);
Resource currentCapacity =
Resources.lessThan(resourceCalculator, partitionResource, consumed,
queueCapacity) ? queueCapacity : Resources.add(consumed, required);
// Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
final int activeUsers = activeUsersManager.getNumActiveUsers();
// User limit resource is determined by:
// max{currentCapacity / #activeUsers, currentCapacity *
// user-limit-percentage%)
Resource userLimitResource = Resources.max(
resourceCalculator, partitionResource,
Resources.divideAndCeil(
resourceCalculator, currentCapacity, activeUsers),
Resources.divideAndCeil(
resourceCalculator,
Resources.multiplyAndRoundDown(
currentCapacity, userLimit),
100)
);
// User limit is capped by maxUserLimit
// - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
// - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
//
// In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
// partition, its guaranteed resource on that partition is 0. And
// user-limit-factor computation is based on queue's guaranteed capacity. So
// we will not cap user-limit as well as used resource when doing
// IGNORE_PARTITION_EXCLUSIVITY allocation.
Resource maxUserLimit = Resources.none();
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxUserLimit =
Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = partitionResource;
}
// Cap final user limit with maxUserLimit
userLimitResource =
Resources.roundUp(
resourceCalculator,
Resources.min(
resourceCalculator, partitionResource,
userLimitResource,
maxUserLimit
),
minimumAllocation);
if (LOG.isDebugEnabled()) {
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
" userLimitPercent=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + consumed +
" user-limit-resource: " + userLimitResource +
" queueCapacity: " + queueCapacity +
" qconsumed: " + queueUsage.getUsed() +
" consumedRatio: " + totalUserConsumedRatio +
" currentCapacity: " + currentCapacity +
" activeUsers: " + activeUsers +
" clusterCapacity: " + clusterResource +
" resourceByLabel: " + partitionResource +
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
" Partition: " + nodePartition
);
}
user.setUserResourceLimit(userLimitResource);
return userLimitResource;
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForActiveUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForActiveUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
/**
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
return usersManager.getComputedResourceLimitForAllUsers(userName,
clusterResource, nodePartition, schedulingMode);
}
@Private
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
@ -1620,52 +1501,34 @@ public class LeafQueue extends AbstractCSQueue {
}
}
private float calculateUserUsageRatio(Resource clusterResource,
/**
* Recalculate QueueUsage Ratio.
*
* @param clusterResource
* Total Cluster Resource
* @param nodePartition
* Partition
*/
public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
try {
writeLock.lock();
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : users.entrySet()) {
user = entry.getValue();
consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, nodePartition);
}
return consumed;
} finally {
writeLock.unlock();
}
}
private void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
try {
writeLock.lock();
ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
queueCapacities.getNodePartitionsSet(),
getQueueCapacities().getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
qUsageRatios.setUsageRatio(partition,
calculateUserUsageRatio(clusterResource, partition));
usersManager.updateUsageRatio(partition, clusterResource);
}
} else{
qUsageRatios.setUsageRatio(nodePartition,
calculateUserUsageRatio(clusterResource, nodePartition));
} else {
usersManager.updateUsageRatio(nodePartition, clusterResource);
}
} finally {
writeLock.unlock();
}
}
private void updateQueueUsageRatio(String nodePartition,
float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
@ -1740,8 +1603,6 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition,
isIncreasedAllocation);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1760,16 +1621,9 @@ public class LeafQueue extends AbstractCSQueue {
// Update user metrics
String userName = application.getUser();
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName);
user.assignContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
// Increment user's resource usage.
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, true);
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
@ -1777,9 +1631,10 @@ public class LeafQueue extends AbstractCSQueue {
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage
.getUsed() + " numContainers=" + numContainers + " headroom = "
+ application.getHeadroom() + " user-resources=" + user.getUsed());
LOG.debug(getQueueName() + " user=" + userName + " used="
+ queueUsage.getUsed(nodePartition) + " numContainers="
+ numContainers + " headroom = " + application.getHeadroom()
+ " user-resources=" + user.getUsed());
}
} finally {
writeLock.unlock();
@ -1793,8 +1648,6 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
super.releaseResource(clusterResource, resource, nodePartition,
isChangeResource);
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@ -1812,13 +1665,8 @@ public class LeafQueue extends AbstractCSQueue {
// Update user metrics
String userName = application.getUser();
User user = getUserAndAddIfAbsent(userName);
user.releaseContainer(resource, nodePartition);
// Update usage ratios
updateQueueUsageRatio(nodePartition,
user.updateUsageRatio(resourceCalculator, resourceByLabel,
nodePartition));
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, false);
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
@ -1877,6 +1725,10 @@ public class LeafQueue extends AbstractCSQueue {
// activate the pending applications if possible
activateApplications();
// In case of any resource change, invalidate recalculateULCount to clear
// the computed user-limit.
usersManager.userLimitNeedsRecompute();
// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
@ -1892,16 +1744,16 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
resourceToInc);
usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
resourceToDec);
usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
@ -1921,191 +1773,6 @@ public class LeafQueue extends AbstractCSQueue {
queueUsage.decAMUsed(nodeLabel, resourceToDec);
}
/*
* Usage Ratio
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
Float fl = usageRatios.get(label);
if (null == fl) {
fl = new Float(0.0);
}
fl += delta;
usageRatios.put(label, new Float(fl));
} finally {
writeLock.unlock();
}
}
float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, new Float(ratio));
} finally {
writeLock.unlock();
}
}
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
volatile int pendingApplications = 0;
volatile int activeApplications = 0;
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
User() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Nobody uses read-lock now, will add it when necessary
writeLock = lock.writeLock();
}
public ResourceUsage getResourceUsage() {
return userResourceUsage;
}
public float resetAndUpdateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
writeLock.unlock();
}
}
public float updateUsageRatio(
ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
} finally {
writeLock.unlock();
}
}
public Resource getUsed() {
return userResourceUsage.getUsed();
}
public Resource getAllUsed() {
return userResourceUsage.getAllUsed();
}
public Resource getUsed(String label) {
return userResourceUsage.getUsed(label);
}
public int getPendingApplications() {
return pendingApplications;
}
public int getActiveApplications() {
return activeApplications;
}
public Resource getConsumedAMResources() {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}
public void submitApplication() {
try {
writeLock.lock();
++pendingApplications;
} finally {
writeLock.unlock();
}
}
public void activateApplication() {
try {
writeLock.lock();
--pendingApplications;
++activeApplications;
} finally {
writeLock.unlock();
}
}
public void finishApplication(boolean wasActive) {
try {
writeLock.lock();
if (wasActive) {
--activeApplications;
} else{
--pendingApplications;
}
} finally {
writeLock.unlock();
}
}
public void assignContainer(Resource resource, String nodePartition) {
userResourceUsage.incUsed(nodePartition, resource);
}
public void releaseContainer(Resource resource, String nodePartition) {
userResourceUsage.decUsed(nodePartition, resource);
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
}
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
@ -2175,9 +1842,9 @@ public class LeafQueue extends AbstractCSQueue {
* excessive preemption.
* @return Total pending resource considering user limit
*/
public Resource getTotalPendingResourcesConsideringUserLimit(
Resource clusterResources, String partition, boolean deductReservedFromPending) {
Resource clusterResources, String partition,
boolean deductReservedFromPending) {
try {
readLock.lock();
Map<String, Resource> userNameToHeadroom =
@ -2188,8 +1855,8 @@ public class LeafQueue extends AbstractCSQueue {
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), clusterResources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
getResourceLimitForActiveUsers(app.getUser(), clusterResources,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
@ -2219,16 +1886,6 @@ public class LeafQueue extends AbstractCSQueue {
}
public synchronized Resource getUserLimitPerUser(String userName,
Resource resources, String partition) {
// Check user resource limit
User user = getUser(userName);
return computeUserLimit(userName, resources, user, partition,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {

View File

@ -897,7 +897,7 @@ public class ParentQueue extends AbstractCSQueue {
}
@Override
public ActiveUsersManager getActiveUsersManager() {
public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}

View File

@ -0,0 +1,982 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
/**
* {@link UsersManager} tracks users in the system and its respective data
* structures.
*/
@Private
public class UsersManager implements AbstractUsersManager {
private static final Log LOG = LogFactory.getLog(UsersManager.class);
/*
* Member declaration for UsersManager class.
*/
private final LeafQueue lQueue;
private final RMNodeLabelsManager labelManager;
private final ResourceCalculator resourceCalculator;
private final CapacitySchedulerContext scheduler;
private Map<String, User> users = new ConcurrentHashMap<>();
private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage();
private Set<String> activeUsersSet = new HashSet<String>();
private Set<String> nonActiveUsersSet = new HashSet<String>();
// Summation of consumed ratios for all users in queue
private UsageRatios qUsageRatios;
// To detect whether there is a change in user count for every user-limit
// calculation.
private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private volatile int userLimit;
private volatile float userLimitFactor;
private WriteLock writeLock;
private ReadLock readLock;
private final QueueMetrics metrics;
private AtomicInteger activeUsers = new AtomicInteger(0);
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
// Pre-computed list of user-limits.
Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
/**
* UsageRatios will store the total used resources ratio across all users of
* the queue.
*/
static private class UsageRatios {
private Map<String, Float> usageRatios;
private ReadLock readLock;
private WriteLock writeLock;
public UsageRatios() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usageRatios = new HashMap<String, Float>();
}
private void incUsageRatio(String label, float delta) {
try {
writeLock.lock();
float usage = 0f;
if (usageRatios.containsKey(label)) {
usage = usageRatios.get(label);
}
usage += delta;
usageRatios.put(label, usage);
} finally {
writeLock.unlock();
}
}
private float getUsageRatio(String label) {
try {
readLock.lock();
Float f = usageRatios.get(label);
if (null == f) {
return 0.0f;
}
return f;
} finally {
readLock.unlock();
}
}
private void setUsageRatio(String label, float ratio) {
try {
writeLock.lock();
usageRatios.put(label, ratio);
} finally {
writeLock.unlock();
}
}
} /* End of UserRatios class */
/**
* User class stores all user related resource usage, application details.
*/
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
String userName = null;
volatile Resource userResourceLimit = Resource.newInstance(0, 0);
private volatile AtomicInteger pendingApplications = new AtomicInteger(0);
private volatile AtomicInteger activeApplications = new AtomicInteger(0);
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
public User(String name) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Nobody uses read-lock now, will add it when necessary
writeLock = lock.writeLock();
this.userName = name;
}
public ResourceUsage getResourceUsage() {
return userResourceUsage;
}
public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
userUsageRatios.setUsageRatio(nodePartition, 0);
return updateUsageRatio(resourceCalculator, resource, nodePartition);
} finally {
writeLock.unlock();
}
}
public float updateUsageRatio(ResourceCalculator resourceCalculator,
Resource resource, String nodePartition) {
try {
writeLock.lock();
float delta;
float newRatio = Resources.ratio(resourceCalculator,
getUsed(nodePartition), resource);
delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
userUsageRatios.setUsageRatio(nodePartition, newRatio);
return delta;
} finally {
writeLock.unlock();
}
}
public Resource getUsed() {
return userResourceUsage.getUsed();
}
public Resource getAllUsed() {
return userResourceUsage.getAllUsed();
}
public Resource getUsed(String label) {
return userResourceUsage.getUsed(label);
}
public int getPendingApplications() {
return pendingApplications.get();
}
public int getActiveApplications() {
return activeApplications.get();
}
public Resource getConsumedAMResources() {
return userResourceUsage.getAMUsed();
}
public Resource getConsumedAMResources(String label) {
return userResourceUsage.getAMUsed(label);
}
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
}
public void submitApplication() {
pendingApplications.incrementAndGet();
}
public void activateApplication() {
pendingApplications.decrementAndGet();
activeApplications.incrementAndGet();
}
public void finishApplication(boolean wasActive) {
if (wasActive) {
activeApplications.decrementAndGet();
} else {
pendingApplications.decrementAndGet();
}
}
public Resource getUserResourceLimit() {
return userResourceLimit;
}
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
} /* End of User class */
/**
* UsersManager Constructor.
*
* @param metrics
* Queue Metrics
* @param lQueue
* Leaf Queue Object
* @param labelManager
* Label Manager instance
* @param scheduler
* Capacity Scheduler Context
* @param resourceCalculator
* rc
*/
public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
ResourceCalculator resourceCalculator) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.lQueue = lQueue;
this.scheduler = scheduler;
this.labelManager = labelManager;
this.resourceCalculator = resourceCalculator;
this.qUsageRatios = new UsageRatios();
this.metrics = metrics;
this.writeLock = lock.writeLock();
this.readLock = lock.readLock();
}
/**
* Get configured user-limit.
* @return user limit
*/
public int getUserLimit() {
return userLimit;
}
/**
* Set configured user-limit.
* @param userLimit user limit
*/
public void setUserLimit(int userLimit) {
this.userLimit = userLimit;
}
/**
* Get configured user-limit factor.
* @return user-limit factor
*/
public float getUserLimitFactor() {
return userLimitFactor;
}
/**
* Set configured user-limit factor.
* @param userLimitFactor User Limit factor.
*/
public void setUserLimitFactor(float userLimitFactor) {
this.userLimitFactor = userLimitFactor;
}
@VisibleForTesting
public float getUsageRatio(String label) {
return qUsageRatios.getUsageRatio(label);
}
/**
* Force UsersManager to recompute userlimit.
*/
public void userLimitNeedsRecompute() {
// If latestVersionOfUsersState is negative due to overflow, ideally we need
// to reset it. This method is invoked from UsersManager and LeafQueue and
// all is happening within write/readLock. Below logic can help to set 0.
try {
writeLock.lock();
long value = latestVersionOfUsersState.incrementAndGet();
if (value < 0) {
latestVersionOfUsersState.set(0);
}
} finally {
writeLock.unlock();
}
}
/*
* Get all users of queue.
*/
private Map<String, User> getUsers() {
return users;
}
/**
* Get user object for given user name.
*
* @param userName
* User Name
* @return User object
*/
public User getUser(String userName) {
return users.get(userName);
}
/**
* Remove user.
*
* @param userName
* User Name
*/
public void removeUser(String userName) {
try {
writeLock.lock();
this.users.remove(userName);
// Remove user from active/non-active list as well.
activeUsersSet.remove(userName);
nonActiveUsersSet.remove(userName);
} finally {
writeLock.unlock();
}
}
/**
* Get and add user if absent.
*
* @param userName
* User Name
* @return User object
*/
public User getUserAndAddIfAbsent(String userName) {
try {
writeLock.lock();
User u = getUser(userName);
if (null == u) {
u = new User(userName);
addUser(userName, u);
// Add to nonActive list so that resourceUsage could be tracked
if (!nonActiveUsersSet.contains(userName)) {
nonActiveUsersSet.add(userName);
}
}
return u;
} finally {
writeLock.unlock();
}
}
/*
* Add a new user
*/
private void addUser(String userName, User user) {
this.users.put(userName, user);
}
/**
* @return an ArrayList of UserInfo objects who are active in this queue
*/
public ArrayList<UserInfo> getUsersInfo() {
try {
readLock.lock();
ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
User user = entry.getValue();
usersToReturn.add(
new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
user.getActiveApplications(), user.getPendingApplications(),
Resources.clone(user.getConsumedAMResources()),
Resources.clone(user.getUserResourceLimit()),
user.getResourceUsage()));
}
return usersToReturn;
} finally {
readLock.unlock();
}
}
/**
* Get computed user-limit for all ACTIVE users in this queue. If cached data
* is invalidated due to resource change, this method also enforce to
* recompute user-limit.
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getComputedResourceLimitForActiveUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedActiveUserLimit
.get(nodePartition);
try {
writeLock.lock();
if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
// recompute
userLimitPerSchedulingMode = reComputeUserLimits(userName,
nodePartition, clusterResource, schedulingMode, true);
// update user count to cache so that we can avoid recompute if no major
// changes.
setLocalVersionOfUsersState(nodePartition, schedulingMode, true);
}
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("userLimit is fetched. userLimit = "
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ schedulingMode + ", partition=" + nodePartition);
}
return userLimitPerSchedulingMode.get(schedulingMode);
}
/**
* Get computed user-limit for all users in this queue. If cached data is
* invalidated due to resource change, this method also enforce to recompute
* user-limit.
*
* @param userName
* Name of user who has submitted one/more app to given queue.
* @param clusterResource
* total cluster resource
* @param nodePartition
* partition name
* @param schedulingMode
* scheduling mode
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
public Resource getComputedResourceLimitForAllUsers(String userName,
Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedAllUserLimit
.get(nodePartition);
try {
writeLock.lock();
if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
// recompute
userLimitPerSchedulingMode = reComputeUserLimits(userName,
nodePartition, clusterResource, schedulingMode, false);
// update user count to cache so that we can avoid recompute if no major
// changes.
setLocalVersionOfUsersState(nodePartition, schedulingMode, false);
}
} finally {
writeLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("userLimit is fetched. userLimit = "
+ userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ schedulingMode + ", partition=" + nodePartition);
}
return userLimitPerSchedulingMode.get(schedulingMode);
}
/*
* Recompute user-limit under following conditions: 1. cached user-limit does
* not exist in local map. 2. Total User count doesn't match with local cached
* version.
*/
private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
String nodePartition, boolean isActive) {
return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
isActive) != latestVersionOfUsersState.get());
}
/*
* Set Local version of user count per label to invalidate cache if needed.
*/
private void setLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
writeLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
.get(nodePartition);
if (null == localVersion) {
localVersion = new HashMap<SchedulingMode, Long>();
localVersionOfUsersState.put(nodePartition, localVersion);
}
localVersion.put(schedulingMode, latestVersionOfUsersState.get());
} finally {
writeLock.unlock();
}
}
/*
* Get Local version of user count per label to invalidate cache if needed.
*/
private long getLocalVersionOfUsersState(String nodePartition,
SchedulingMode schedulingMode, boolean isActive) {
try {
this.readLock.lock();
Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
? localVersionOfActiveUsersState
: localVersionOfAllUsersState;
if (!localVersionOfUsersState.containsKey(nodePartition)) {
return -1;
}
Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
.get(nodePartition);
if (!localVersion.containsKey(schedulingMode)) {
return -1;
}
return localVersion.get(schedulingMode);
} finally {
readLock.unlock();
}
}
private Map<SchedulingMode, Resource> reComputeUserLimits(String userName,
String nodePartition, Resource clusterResource,
SchedulingMode schedulingMode, boolean activeMode) {
// preselect stored map as per active user-limit or all user computation.
Map<String, Map<SchedulingMode, Resource>> computedMap = null;
computedMap = (activeMode)
? preComputedActiveUserLimit
: preComputedAllUserLimit;
Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap
.get(nodePartition);
if (userLimitPerSchedulingMode == null) {
userLimitPerSchedulingMode = new ConcurrentHashMap<>();
computedMap.put(nodePartition, userLimitPerSchedulingMode);
}
// compute user-limit per scheduling mode.
Resource computedUserLimit = computeUserLimit(userName, clusterResource,
nodePartition, schedulingMode, activeMode);
// update in local storage
userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
return userLimitPerSchedulingMode;
}
private Resource computeUserLimit(String userName, Resource clusterResource,
String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
/*
* What is our current capacity?
* * It is equal to the max(required, queue-capacity) if we're running
* below capacity. The 'max' ensures that jobs in queues with miniscule
* capacity (< 1 slot) make progress
* * If we're running over capacity, then its (usedResources + required)
* (which extra resources we are allocating)
*/
Resource queueCapacity = Resources.multiplyAndNormalizeUp(
resourceCalculator, partitionResource,
lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
lQueue.getMinimumAllocation());
/*
* Assume we have required resource equals to minimumAllocation, this can
* make sure user limit can continuously increase till queueMaxResource
* reached.
*/
Resource required = lQueue.getMinimumAllocation();
// Allow progress for queues with miniscule capacity
queueCapacity = Resources.max(resourceCalculator, partitionResource,
queueCapacity, required);
/*
* We want to base the userLimit calculation on max(queueCapacity,
* usedResources+required). However, we want usedResources to be based on
* the combined ratios of all the users in the queue so we use consumedRatio
* to calculate such. The calculation is dependent on how the
* resourceCalculator calculates the ratio between two Resources. DRF
* Example: If usedResources is greater than queueCapacity and users have
* the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
* 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
* is then 20+30=50%. Yes, this value can be larger than 100% but for the
* purposes of making sure all users are getting their fair share, it works.
*/
Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
partitionResource, getUsageRatio(nodePartition),
lQueue.getMinimumAllocation());
Resource currentCapacity = Resources.lessThan(resourceCalculator,
partitionResource, consumed, queueCapacity)
? queueCapacity
: Resources.add(consumed, required);
/*
* Never allow a single user to take more than the queue's configured
* capacity * user-limit-factor. Also, the queue's configured capacity
* should be higher than queue-hard-limit * ulMin
*/
int usersCount = getNumActiveUsers();
Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
// For non-activeUser calculation, consider all users count.
if (!activeUser) {
resourceUsed = currentCapacity;
usersCount = users.size();
}
/*
* User limit resource is determined by: max{currentCapacity / #activeUsers,
* currentCapacity * user-limit-percentage%)
*/
Resource userLimitResource = Resources.max(resourceCalculator,
partitionResource,
Resources.divideAndCeil(resourceCalculator, resourceUsed,
usersCount),
Resources.divideAndCeil(resourceCalculator,
Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
100));
// User limit is capped by maxUserLimit
// - maxUserLimit = queueCapacity * user-limit-factor
// (RESPECT_PARTITION_EXCLUSIVITY)
// - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
//
// In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
// partition, its guaranteed resource on that partition is 0. And
// user-limit-factor computation is based on queue's guaranteed capacity. So
// we will not cap user-limit as well as used resource when doing
// IGNORE_PARTITION_EXCLUSIVITY allocation.
Resource maxUserLimit = Resources.none();
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
getUserLimitFactor());
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
maxUserLimit = partitionResource;
}
// Cap final user limit with maxUserLimit
userLimitResource = Resources
.roundUp(resourceCalculator,
Resources.min(resourceCalculator, partitionResource,
userLimitResource, maxUserLimit),
lQueue.getMinimumAllocation());
if (LOG.isDebugEnabled()) {
LOG.debug("User limit computation for " + userName + " in queue "
+ lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
+ " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
+ required + " consumed: " + consumed + " user-limit-resource: "
+ userLimitResource + " queueCapacity: " + queueCapacity
+ " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
+ " currentCapacity: " + currentCapacity + " activeUsers: "
+ usersCount + " clusterCapacity: " + clusterResource
+ " resourceByLabel: " + partitionResource + " usageratio: "
+ getUsageRatio(nodePartition) + " Partition: " + nodePartition);
}
getUser(userName).setUserResourceLimit(userLimitResource);
return userLimitResource;
}
/**
* Update new usage ratio.
*
* @param partition
* Node partition
* @param clusterResource
* Cluster Resource
*/
public void updateUsageRatio(String partition, Resource clusterResource) {
try {
writeLock.lock();
Resource resourceByLabel = labelManager.getResourceByLabel(partition,
clusterResource);
float consumed = 0;
User user;
for (Map.Entry<String, User> entry : getUsers().entrySet()) {
user = entry.getValue();
consumed += user.setAndUpdateUsageRatio(resourceCalculator,
resourceByLabel, partition);
}
qUsageRatios.setUsageRatio(partition, consumed);
} finally {
writeLock.unlock();
}
}
/*
* Increment Queue Usage Ratio.
*/
private void incQueueUsageRatio(String nodePartition, float delta) {
qUsageRatios.incUsageRatio(nodePartition, delta);
}
@Override
public void activateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps == null) {
userApps = new HashSet<ApplicationId>();
usersApplications.put(user, userApps);
activeUsers.incrementAndGet();
metrics.incrActiveUsers();
// A user is added to active list. Invalidate user-limit cache.
userLimitNeedsRecompute();
updateActiveUsersResourceUsage(user);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " added to activeUsers, currently: "
+ activeUsers);
}
}
if (userApps.add(applicationId)) {
metrics.activateApp(user);
}
} finally {
this.writeLock.unlock();
}
}
@Override
public void deactivateApplication(String user, ApplicationId applicationId) {
try {
this.writeLock.lock();
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps != null) {
if (userApps.remove(applicationId)) {
metrics.deactivateApp(user);
}
if (userApps.isEmpty()) {
usersApplications.remove(user);
activeUsers.decrementAndGet();
metrics.decrActiveUsers();
// A user is removed from active list. Invalidate user-limit cache.
userLimitNeedsRecompute();
updateNonActiveUsersResourceUsage(user);
if (LOG.isDebugEnabled()) {
LOG.debug("User " + user + " removed from activeUsers, currently: "
+ activeUsers);
}
}
}
} finally {
this.writeLock.unlock();
}
}
@Override
public int getNumActiveUsers() {
return activeUsers.get();
}
private void updateActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
// If User is moved to active list, moved resource usage from non-active
// to active list.
if (nonActiveUsersSet.contains(userName)) {
nonActiveUsersSet.remove(userName);
activeUsersSet.add(userName);
// Update total resource usage of active and non-active after user
// is moved from non-active to active.
for (String partition : resourceUsage.getNodePartitionsSet()) {
totalResUsageForNonActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition));
totalResUsageForActiveUsers.incUsed(partition,
resourceUsage.getUsed(partition));
}
if (LOG.isDebugEnabled()) {
LOG.debug("User '" + userName
+ "' has become active. Hence move user to active list."
+ "Active users size = " + activeUsersSet.size()
+ "Non-active users size = " + nonActiveUsersSet.size()
+ "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
} finally {
this.writeLock.unlock();
}
}
private void updateNonActiveUsersResourceUsage(String userName) {
try {
this.writeLock.lock();
// For UT case: We might need to add the user to users list.
User user = getUserAndAddIfAbsent(userName);
ResourceUsage resourceUsage = user.getResourceUsage();
// If User is moved to non-active list, moved resource usage from
// non-active to active list.
if (activeUsersSet.contains(userName)) {
activeUsersSet.remove(userName);
nonActiveUsersSet.add(userName);
// Update total resource usage of active and non-active after user is
// moved from active to non-active.
for (String partition : resourceUsage.getNodePartitionsSet()) {
totalResUsageForActiveUsers.decUsed(partition,
resourceUsage.getUsed(partition));
totalResUsageForNonActiveUsers.incUsed(partition,
resourceUsage.getUsed(partition));
if (LOG.isDebugEnabled()) {
LOG.debug("User '" + userName
+ "' has become non-active.Hence move user to non-active list."
+ "Active users size = " + activeUsersSet.size()
+ "Non-active users size = " + nonActiveUsersSet.size()
+ "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
}
} finally {
this.writeLock.unlock();
}
}
private ResourceUsage getTotalResourceUsagePerUser(String userName) {
if (nonActiveUsersSet.contains(userName)) {
return totalResUsageForNonActiveUsers;
} else if (activeUsersSet.contains(userName)) {
return totalResUsageForActiveUsers;
} else {
LOG.warn("User '" + userName
+ "' is not present in active/non-active. This is highly unlikely."
+ "We can consider this user in non-active list in this case.");
return totalResUsageForNonActiveUsers;
}
}
/**
* During container allocate/release, ensure that all user specific data
* structures are updated.
*
* @param userName
* Name of the user
* @param resource
* Resource to increment/decrement
* @param nodePartition
* Node label
* @param isAllocate
* Indicate whether to allocate or release resource
* @return user
*/
public User updateUserResourceUsage(String userName, Resource resource,
String nodePartition, boolean isAllocate) {
try {
this.writeLock.lock();
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
User user = getUserAndAddIfAbsent(userName);
// New container is allocated. Invalidate user-limit.
updateResourceUsagePerUser(user, resource, nodePartition, isAllocate);
userLimitNeedsRecompute();
// Update usage ratios
Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
scheduler.getClusterResource());
incQueueUsageRatio(nodePartition, user.updateUsageRatio(
resourceCalculator, resourceByLabel, nodePartition));
return user;
} finally {
this.writeLock.unlock();
}
}
private void updateResourceUsagePerUser(User user, Resource resource,
String nodePartition, boolean isAllocate) {
ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser(
user.userName);
if (isAllocate) {
user.getResourceUsage().incUsed(nodePartition, resource);
totalResourceUsageForUsers.incUsed(nodePartition, resource);
} else {
user.getResourceUsage().decUsed(nodePartition, resource);
totalResourceUsageForUsers.decUsed(nodePartition, resource);
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"User resource is updated." + "Total Resource usage for active users="
+ totalResUsageForActiveUsers.getAllUsed() + "."
+ "Total Resource usage for non-active users="
+ totalResUsageForNonActiveUsers.getAllUsed());
}
}
}

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
@ -117,24 +117,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
new ConcurrentHashMap<>();
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
Priority.newInstance(0), false);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
appPriority, isAttemptRecovering, null);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());

View File

@ -465,7 +465,7 @@ public class FSLeafQueue extends FSQueue {
}
@Override
public ActiveUsersManager getActiveUsersManager() {
public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager;
}

View File

@ -294,7 +294,7 @@ public class FSParentQueue extends FSQueue {
}
@Override
public ActiveUsersManager getActiveUsersManager() {
public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}

View File

@ -177,7 +177,7 @@ public class FifoScheduler extends
}
@Override
public ActiveUsersManager getActiveUsersManager() {
public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager;
}

View File

@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
maxApplications = q.getMaxApplications();
maxApplicationsPerUser = q.getMaxApplicationsPerUser();
userLimit = q.getUserLimit();
users = new UsersInfo(q.getUsers());
users = new UsersInfo(q.getUsersManager().getUsersInfo());
userLimitFactor = q.getUserLimitFactor();
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -361,9 +362,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
queue.getQueueCapacities().getAbsoluteCapacity());
HashSet<String> users = userMap.get(queue.getQueueName());
Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
for (String user : users) {
when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
anyString())).thenReturn(userLimit);
for (String userName : users) {
when(queue.getResourceLimitForAllUsers(eq(userName),
any(Resource.class), anyString(), any(SchedulingMode.class)))
.thenReturn(userLimit);
}
}
}

View File

@ -75,7 +75,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user);
// confirm that containerId is calculated based on epoch.
@ -170,7 +170,7 @@ public class TestSchedulerApplicationAttempt {
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
return queue;
}
@ -199,7 +199,7 @@ public class TestSchedulerApplicationAttempt {
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app =
new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@ -212,7 +212,7 @@ public class TestSchedulerApplicationAttempt {
queue = createQueue("test2", null, 0.5f);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
queue.getAbstractUsersManager(), rmContext);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
@ -230,7 +230,7 @@ public class TestSchedulerApplicationAttempt {
queue = createQueue("test3", null, 0.0f);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
queue.getAbstractUsersManager(), rmContext);
// Resource request
app.attemptResourceUsage.incUsed(requestedResource);
@ -256,7 +256,7 @@ public class TestSchedulerApplicationAttempt {
final String user = "user1";
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
user, queue, queue.getActiveUsersManager(), rmContext);
user, queue, queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@ -275,7 +275,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
attemptId, "user", queue, queue.getActiveUsersManager(), rmContext);
attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1);
SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
assertEquals(0, app.getSchedulingOpportunities(schedulerKey));

View File

@ -192,7 +192,7 @@ public class TestApplicationLimits {
clusterResource));
ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
assertEquals(Resource.newInstance(8 * GB, 1),
queue.calculateAndGetAMResourceLimit());
@ -632,7 +632,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
appAttemptId_0_0, user_0, queue,
queue.getActiveUsersManager(), spyRMContext);
queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -644,7 +644,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
Resource expectedHeadroom = Resources.createResource(5*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
@ -652,7 +652,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
appAttemptId_0_1, user_0, queue,
queue.getActiveUsersManager(), spyRMContext);
queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -672,7 +672,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
appAttemptId_1_0, user_1, queue,
queue.getActiveUsersManager(), spyRMContext);
queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@ -691,6 +691,11 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
// Any change is cluster resource needs to enforce user-limit recomputation.
// In existing code, LeafQueue#updateClusterResource handled this. However
// here that method was not used.
queue.getUsersManager().userLimitNeedsRecompute();
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes

View File

@ -657,7 +657,7 @@ public class TestApplicationLimitsByPartition {
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
queue, queue.getActiveUsersManager(), spyRMContext);
queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@ -669,16 +669,16 @@ public class TestApplicationLimitsByPartition {
queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//head room = queue capacity = 50 % 90% 160 GB
//head room = queue capacity = 50 % 90% 160 GB * 0.25 (UL)
Resource expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1);
Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
queue, queue.getActiveUsersManager(), spyRMContext);
queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@ -701,15 +701,16 @@ public class TestApplicationLimitsByPartition {
assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
Resource expectedHeadroomWithReqInY =
Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom);
Resource expectedHeadroomWithReqInY = Resources.add(
Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
expectedHeadroom);
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
queue, queue.getActiveUsersManager(), spyRMContext);
queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@ -728,12 +729,12 @@ public class TestApplicationLimitsByPartition {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
//head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
expectedHeadroom =
Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1);
Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
expectedHeadroomWithReqInY =
Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1),
expectedHeadroom);
expectedHeadroomWithReqInY = Resources.add(
Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
expectedHeadroom);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());

View File

@ -184,7 +184,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
LeafQueue.User user = queue.getUser(userName);
UsersManager.User user = queue.getUser(userName);
Assert.assertEquals(memory,
user.getResourceUsage().getUsed(partition).getMemorySize());
}
@ -241,7 +241,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
LeafQueue queue =
(LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
.getQueue("a");
ArrayList<UserInfo> users = queue.getUsers();
ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
for (UserInfo userInfo : users) {
if (userInfo.getUsername().equals("user")) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@ -77,10 +78,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -522,19 +521,22 @@ public class TestLeafQueue {
// Users
final String user_0 = "user_0";
// Active Users Manager
AbstractUsersManager activeUserManager = a.getAbstractUsersManager();
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext);
activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
@ -683,7 +685,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext);
b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
// Setup some nodes
@ -747,14 +749,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
b.getActiveUsersManager(), spyRMContext);
b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app2 =
new FiCaSchedulerApp(appAttemptId2, user1, b,
b.getActiveUsersManager(), spyRMContext);
b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app2, user1);
// Setup some nodes
@ -775,6 +777,7 @@ public class TestLeafQueue {
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// Setup resource-requests so that one application is memory dominant
// and other application is vcores dominant
@ -798,7 +801,7 @@ public class TestLeafQueue {
User queueUser1 = b.getUser(user1);
assertEquals("There should 2 active users!", 2, b
.getActiveUsersManager().getNumActiveUsers());
.getAbstractUsersManager().getNumActiveUsers());
// Fill both Nodes as far as we can
CSAssignment assign;
do {
@ -833,7 +836,7 @@ public class TestLeafQueue {
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
// Add another node and make sure consumedRatio is adjusted
// accordingly.
numNodes = 3;
@ -847,7 +850,7 @@ public class TestLeafQueue {
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
}
@Test
@ -857,6 +860,9 @@ public class TestLeafQueue {
//unset maxCapacity
a.setMaxCapacity(1.0f);
when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32));
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
@ -866,14 +872,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); // different user
// Setup some nodes
@ -912,7 +918,7 @@ public class TestLeafQueue {
a.setUserLimitFactor(2);
// There're two active users
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0
applyCSAssignment(clusterResource,
@ -947,7 +953,7 @@ public class TestLeafQueue {
// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
1, a.getAbstractUsersManager().getNumActiveUsers());
}
@ -998,7 +1004,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
qb.getAbstractUsersManager(), spyRMContext);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
apps.put(app_0.getApplicationAttemptId(), app_0);
qb.submitApplicationAttempt(app_0, user_0);
@ -1009,7 +1015,7 @@ public class TestLeafQueue {
u0Priority, recordFactory)));
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
1, qb.getAbstractUsersManager().getNumActiveUsers());
//get headroom
applyCSAssignment(clusterResource,
qb.assignContainers(clusterResource, node_0,
@ -1026,7 +1032,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
qb.getActiveUsersManager(), spyRMContext);
qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_2.getApplicationAttemptId(), app_2);
Priority u1Priority = TestUtils.createMockPriority(2);
SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
@ -1068,13 +1074,13 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_1.getApplicationAttemptId(), app_1);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
qb.getActiveUsersManager(), spyRMContext);
qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_3.getApplicationAttemptId(), app_3);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
@ -1103,7 +1109,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
qb.getActiveUsersManager(), spyRMContext);
qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_4.getApplicationAttemptId(), app_4);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
@ -1126,9 +1132,9 @@ public class TestLeafQueue {
//testcase3 still active - 2+2+6=10
assertEquals(10*GB, qb.getUsedResources().getMemorySize());
//app4 is user 0
//maxqueue 16G, userlimit 13G, used 8G, headroom 5G
//maxqueue 16G, userlimit 7G, used 8G, headroom 5G
//(8G used is 6G from this test case - app4, 2 from last test case, app_1)
assertEquals(5*GB, app_4.getHeadroom().getMemorySize());
assertEquals(0*GB, app_4.getHeadroom().getMemorySize());
}
@Test
@ -1147,21 +1153,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
@ -1247,21 +1253,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
@ -1301,7 +1307,7 @@ public class TestLeafQueue {
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
1, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0
applyCSAssignment(clusterResource,
@ -1312,8 +1318,8 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
// TODO, fix headroom in the future patch
assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
// User limit = 4G, 2 in use
assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
// User limit = 2G, 2 in use
assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
// the application is not yet active
@ -1325,15 +1331,15 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
@ -1352,7 +1358,7 @@ public class TestLeafQueue {
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers());
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
@ -1378,28 +1384,28 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
a.getActiveUsersManager(), spyRMContext);
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_3, user_2);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
@ -1417,7 +1423,8 @@ public class TestLeafQueue {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
@ -1744,6 +1751,7 @@ public class TestLeafQueue {
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1));
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
@ -3648,7 +3656,7 @@ public class TestLeafQueue {
final String user = "user1";
FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@ -3663,7 +3671,7 @@ public class TestLeafQueue {
// child of root, its absolute capaicty is also 50%.
queue = createQueue("test2", null, 0.5f, 0.5f);
app = new FiCaSchedulerApp(appAttId, user, queue,
queue.getActiveUsersManager(), rmContext);
queue.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@ -3675,7 +3683,7 @@ public class TestLeafQueue {
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
app = new FiCaSchedulerApp(appAttId, user, qChild,
qChild.getActiveUsersManager(), rmContext);
qChild.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@ -3699,7 +3707,7 @@ public class TestLeafQueue {
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);
when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
QueueCapacities qCaps = mock(QueueCapacities.class);
when(qCaps.getAbsoluteCapacity(any())).thenReturn(absCap);

View File

@ -1049,7 +1049,7 @@ public class TestNodeLabelContainerAllocation {
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
// Each application request 5 * 1GB container
// Each application request 50 * 1GB container
am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
@ -1169,12 +1169,14 @@ public class TestNodeLabelContainerAllocation {
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
csConf.setUserLimit(A, 200);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
csConf.setUserLimit(B, 200);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
@ -1207,6 +1209,7 @@ public class TestNodeLabelContainerAllocation {
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
@ -1250,7 +1253,7 @@ public class TestNodeLabelContainerAllocation {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
}
private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
throws InterruptedException {
int totalWaitTick = 100; // wait 10 sec at most.