YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-06-19 14:50:24 +02:00
parent 9821b94c94
commit 469841446f
14 changed files with 1229 additions and 8 deletions

View File

@ -146,6 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue {
volatile Priority priority = Priority.newInstance(0);
private Map<String, Float> userWeights = new HashMap<String, Float>();
private int maxParallelApps;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@ -390,6 +391,11 @@ public abstract class AbstractCSQueue implements CSQueue {
// and queue setting
setupMaximumAllocation(configuration);
// Max parallel apps
int queueMaxParallelApps =
configuration.getMaxParallelAppsForQueue(getQueuePath());
setMaxParallelApps(queueMaxParallelApps);
// initialized the queue state based on previous state, configured state
// and its parent state.
QueueState previous = getState();
@ -1431,4 +1437,14 @@ public abstract class AbstractCSQueue implements CSQueue {
public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
return defaultAppLifetimeWasSpecifiedInConfig;
}
public void setMaxParallelApps(int maxParallelApps) {
this.maxParallelApps = maxParallelApps;
}
public int getMaxParallelApps() {
return maxParallelApps;
}
abstract int getNumRunnableApps();
}

View File

@ -0,0 +1,436 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
/**
* Handles tracking and enforcement for user and queue maxRunningApps
* constraints.
*/
public class CSMaxRunningAppsEnforcer {
private static final Logger LOG = LoggerFactory.getLogger(
CSMaxRunningAppsEnforcer.class);
private final CapacityScheduler scheduler;
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
private final ListMultimap<String, FiCaSchedulerApp> usersNonRunnableApps;
public CSMaxRunningAppsEnforcer(CapacityScheduler scheduler) {
this.scheduler = scheduler;
this.usersNumRunnableApps = new HashMap<String, Integer>();
this.usersNonRunnableApps = ArrayListMultimap.create();
}
/**
* Checks whether making the application runnable would exceed any
* maxRunningApps limits. Also sets the "runnable" flag on the
* attempt.
*
* @param attempt the app attempt being checked
* @return true if the application is runnable; false otherwise
*/
public boolean checkRunnabilityWithUpdate(
FiCaSchedulerApp attempt) {
boolean attemptCanRun = !exceedUserMaxParallelApps(attempt.getUser())
&& !exceedQueueMaxParallelApps(attempt.getCSLeafQueue());
attempt.setRunnable(attemptCanRun);
return attemptCanRun;
}
/**
* Checks whether the number of user runnable apps exceeds the limitation.
*
* @param user the user name
* @return true if the number hits the limit; false otherwise
*/
private boolean exceedUserMaxParallelApps(String user) {
Integer userNumRunnable = usersNumRunnableApps.get(user);
if (userNumRunnable == null) {
userNumRunnable = 0;
}
if (userNumRunnable >= getUserMaxParallelApps(user)) {
LOG.info("Maximum runnable apps exceeded for user {}", user);
return true;
}
return false;
}
/**
* Recursively checks whether the number of queue runnable apps exceeds the
* limitation.
*
* @param queue the current queue
* @return true if the number hits the limit; false otherwise
*/
private boolean exceedQueueMaxParallelApps(AbstractCSQueue queue) {
// Check queue and all parent queues
while (queue != null) {
if (queue.getNumRunnableApps() >= queue.getMaxParallelApps()) {
LOG.info("Maximum runnable apps exceeded for queue {}",
queue.getQueuePath());
return true;
}
queue = (AbstractCSQueue) queue.getParent();
}
return false;
}
public void trackApp(FiCaSchedulerApp app) {
if (app.isRunnable()) {
trackRunnableApp(app);
} else {
trackNonRunnableApp(app);
}
}
/**
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
*/
private void trackRunnableApp(FiCaSchedulerApp app) {
String user = app.getUser();
AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
// Increment running counts for all parent queues
ParentQueue parent = (ParentQueue) queue.getParent();
while (parent != null) {
parent.incrementRunnableApps();
parent = (ParentQueue) parent.getParent();
}
Integer userNumRunnable = usersNumRunnableApps.get(user);
usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
: userNumRunnable) + 1);
}
/**
* Tracks the given new non runnable app so that it can be made runnable when
* it would not violate max running app limits.
*/
private void trackNonRunnableApp(FiCaSchedulerApp app) {
String user = app.getUser();
usersNonRunnableApps.put(user, app);
}
/**
* This is called after reloading the allocation configuration when the
* scheduler is reinitialized
*
* Checks to see whether any non-runnable applications become runnable
* now that the max running apps of given queue has been changed
*
* Runs in O(n) where n is the number of apps that are non-runnable and in
* the queues that went from having no slack to having slack.
*/
public void updateRunnabilityOnReload() {
ParentQueue rootQueue = (ParentQueue) scheduler.getRootQueue();
List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
new ArrayList<List<FiCaSchedulerApp>>();
gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
}
/**
* Checks to see whether any other applications runnable now that the given
* application has been removed from the given queue. And makes them so.
*
* Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack.
*/
public void updateRunnabilityOnAppRemoval(FiCaSchedulerApp app) {
// childqueueX might have no pending apps itself, but if a queue higher up
// in the hierarchy parentqueueY has a maxRunningApps set, an app completion
// in childqueueX could allow an app in some other distant child of
// parentqueueY to become runnable.
// An app removal will only possibly allow another app to become runnable if
// the queue was already at its max before the removal.
// Thus we find the ancestor queue highest in the tree for which the app
// that was at its maxRunningApps before the removal.
LeafQueue queue = app.getCSLeafQueue();
AbstractCSQueue highestQueueWithAppsNowRunnable =
(queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
? queue : null;
ParentQueue parent = (ParentQueue) queue.getParent();
while (parent != null) {
if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) {
highestQueueWithAppsNowRunnable = parent;
}
parent = (ParentQueue) parent.getParent();
}
List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
new ArrayList<List<FiCaSchedulerApp>>();
// Compile lists of apps which may now be runnable
// We gather lists instead of building a set of all non-runnable apps so
// that this whole operation can be O(number of queues) instead of
// O(number of apps)
if (highestQueueWithAppsNowRunnable != null) {
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
appsNowMaybeRunnable);
}
String user = app.getUser();
Integer userNumRunning = usersNumRunnableApps.get(user);
if (userNumRunning == null) {
userNumRunning = 0;
}
if (userNumRunning == getUserMaxParallelApps(user) - 1) {
List<FiCaSchedulerApp> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
}
}
updateAppsRunnability(appsNowMaybeRunnable,
appsNowMaybeRunnable.size());
}
/**
* Checks to see whether applications are runnable now by iterating
* through each one of them and check if the queue and user have slack.
*
* if we know how many apps can be runnable, there is no need to iterate
* through all apps, maxRunnableApps is used to break out of the iteration.
*/
private void updateAppsRunnability(List<List<FiCaSchedulerApp>>
appsNowMaybeRunnable, int maxRunnableApps) {
// Scan through and check whether this means that any apps are now runnable
Iterator<FiCaSchedulerApp> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
FiCaSchedulerApp prev = null;
List<FiCaSchedulerApp> noLongerPendingApps = new ArrayList<>();
while (iter.hasNext()) {
FiCaSchedulerApp next = iter.next();
if (next == prev) {
continue;
}
if (checkRunnabilityWithUpdate(next)) {
LeafQueue nextQueue = next.getCSLeafQueue();
LOG.info("{} is now runnable in {}",
next.getApplicationAttemptId(), nextQueue);
trackRunnableApp(next);
FiCaSchedulerApp appSched = next;
nextQueue.submitApplicationAttempt(next, next.getUser());
noLongerPendingApps.add(appSched);
if (noLongerPendingApps.size() >= maxRunnableApps) {
break;
}
}
prev = next;
}
// We remove the apps from their pending lists afterwards so that we don't
// pull them out from under the iterator. If they are not in these lists
// in the first place, there is a bug.
for (FiCaSchedulerApp appSched : noLongerPendingApps) {
if (!(appSched.getCSLeafQueue().removeNonRunnableApp(appSched))) {
LOG.error("Can't make app runnable that does not already exist in queue"
+ " as non-runnable: {}. This should never happen.",
appSched.getApplicationAttemptId());
}
if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
LOG.error("Waiting app {} expected to be in "
+ "usersNonRunnableApps, but was not. This should never happen.",
appSched.getApplicationAttemptId());
}
}
}
public void untrackApp(FiCaSchedulerApp app) {
if (app.isRunnable()) {
untrackRunnableApp(app);
} else {
untrackNonRunnableApp(app);
}
}
/**
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.
*/
private void untrackRunnableApp(FiCaSchedulerApp app) {
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
if (newUserNumRunning == 0) {
usersNumRunnableApps.remove(user);
} else {
usersNumRunnableApps.put(user, newUserNumRunning);
}
// Update runnable app bookkeeping for queues
AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
ParentQueue parent = (ParentQueue) queue.getParent();
while (parent != null) {
parent.decrementRunnableApps();
parent = (ParentQueue) parent.getParent();
}
}
/**
* Stops tracking the given non-runnable app.
*/
private void untrackNonRunnableApp(FiCaSchedulerApp app) {
usersNonRunnableApps.remove(app.getUser(), app);
}
/**
* Traverses the queue hierarchy under the given queue to gather all lists
* of non-runnable applications.
*/
private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
List<List<FiCaSchedulerApp>> appLists) {
if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
if (queue instanceof LeafQueue) {
appLists.add(
((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
} else {
for (CSQueue child : queue.getChildQueues()) {
gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
}
}
}
}
private int getUserMaxParallelApps(String user) {
CapacitySchedulerConfiguration conf = scheduler.getConfiguration();
if (conf == null) {
return Integer.MAX_VALUE;
}
int userMaxParallelApps = conf.getMaxParallelAppsForUser(user);
return userMaxParallelApps;
}
/**
* Takes a list of lists, each of which is ordered by start time, and returns
* their elements in order of start time.
*
* We maintain positions in each of the lists. Each next() call advances
* the position in one of the lists. We maintain a heap that orders lists
* by the start time of the app in the current position in that list.
* This allows us to pick which list to advance in O(log(num lists)) instead
* of O(num lists) time.
*/
static class MultiListStartTimeIterator implements
Iterator<FiCaSchedulerApp> {
private List<FiCaSchedulerApp>[] appLists;
private int[] curPositionsInAppLists;
private PriorityQueue<IndexAndTime> appListsByCurStartTime;
@SuppressWarnings("unchecked")
MultiListStartTimeIterator(List<List<FiCaSchedulerApp>> appListList) {
appLists = appListList.toArray(new List[appListList.size()]);
curPositionsInAppLists = new int[appLists.length];
appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
for (int i = 0; i < appLists.length; i++) {
long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
.getStartTime();
appListsByCurStartTime.add(new IndexAndTime(i, time));
}
}
@Override
public boolean hasNext() {
return !appListsByCurStartTime.isEmpty()
&& appListsByCurStartTime.peek().time != Long.MAX_VALUE;
}
@Override
public FiCaSchedulerApp next() {
IndexAndTime indexAndTime = appListsByCurStartTime.remove();
int nextListIndex = indexAndTime.index;
FiCaSchedulerApp next = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]);
curPositionsInAppLists[nextListIndex]++;
if (curPositionsInAppLists[nextListIndex] <
appLists[nextListIndex].size()) {
indexAndTime.time = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]).getStartTime();
} else {
indexAndTime.time = Long.MAX_VALUE;
}
appListsByCurStartTime.add(indexAndTime);
return next;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
private static class IndexAndTime implements Comparable<IndexAndTime> {
private int index;
private long time;
IndexAndTime(int index, long time) {
this.index = index;
this.time = time;
}
@Override
public int compareTo(IndexAndTime o) {
return time < o.time ? -1 : (time > o.time ? 1 : 0);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof IndexAndTime)) {
return false;
}
IndexAndTime other = (IndexAndTime)o;
return other.time == time;
}
@Override
public int hashCode() {
return (int)time;
}
}
}
}

View File

@ -242,8 +242,11 @@ public class CapacityScheduler extends
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private long asyncMaxPendingBacklogs;
private CSMaxRunningAppsEnforcer maxRunningEnforcer;
public CapacityScheduler() {
super(CapacityScheduler.class.getName());
this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
}
@Override
@ -483,6 +486,7 @@ public class CapacityScheduler extends
super.reinitialize(newConf, rmContext);
}
maxRunningEnforcer.updateRunnabilityOnReload();
} finally {
writeLock.unlock();
}
@ -1083,6 +1087,9 @@ public class CapacityScheduler extends
// SchedulerApplication#setCurrentAppAttempt.
attempt.setPriority(application.getPriority());
maxRunningEnforcer.checkRunnabilityWithUpdate(attempt);
maxRunningEnforcer.trackApp(attempt);
queue.submitApplicationAttempt(attempt, application.getUser());
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
@ -1176,8 +1183,13 @@ public class CapacityScheduler extends
LOG.error(
"Cannot finish application " + "from non-leaf queue: "
+ csQueue.getQueuePath());
} else{
} else {
csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath());
maxRunningEnforcer.untrackApp(attempt);
if (attempt.isRunnable()) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
}
}
} finally {
writeLock.unlock();
@ -3253,4 +3265,9 @@ public class CapacityScheduler extends
public int getNumAsyncSchedulerThreads() {
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
}
@VisibleForTesting
public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) {
this.maxRunningEnforcer = enforcer;
}
}

View File

@ -378,6 +378,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
/**
* Different resource types supported.
*/
@ -412,7 +416,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
return queueName;
}
static String getUserPrefix(String user) {
return PREFIX + "user." + user + DOT;
}
private String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
@ -1392,6 +1400,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
}
public Integer getMaxParallelAppsForQueue(String queue) {
int defaultMaxParallelAppsForQueue =
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
+ MAX_PARALLEL_APPLICATIONS);
return (maxParallelAppsForQueue != null) ?
Integer.parseInt(maxParallelAppsForQueue)
: defaultMaxParallelAppsForQueue;
}
public Integer getMaxParallelAppsForUser(String user) {
int defaultMaxParallelAppsForUser =
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
DEFAULT_MAX_PARALLEL_APPLICATIONS);
String maxParallelAppsForUser = get(getUserPrefix(user)
+ MAX_PARALLEL_APPLICATIONS);
return (maxParallelAppsForUser != null) ?
Integer.parseInt(maxParallelAppsForUser)
: defaultMaxParallelAppsForUser;
}
private static final String PREEMPTION_CONFIG_PREFIX =
"yarn.resourcemanager.monitor.capacity.preemption.";

View File

@ -129,6 +129,9 @@ public class LeafQueue extends AbstractCSQueue {
List<AppPriorityACLGroup> priorityAcls =
new ArrayList<AppPriorityACLGroup>();
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@ -159,6 +162,7 @@ public class LeafQueue extends AbstractCSQueue {
setupQueueConfigs(clusterResource, csContext.getConfiguration());
}
@SuppressWarnings("checkstyle:nowhitespaceafter")
protected void setupQueueConfigs(Resource clusterResource,
CapacitySchedulerConfiguration conf) throws
IOException {
@ -289,7 +293,9 @@ public class LeafQueue extends AbstractCSQueue {
+ " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
+ "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
+ " [= (int)(maxApplications * (userLimit / 100.0f) * "
+ "userLimitFactor) ]" + "\n" + "usedCapacity = "
+ "userLimitFactor) ]" + "\n"
+ "maxParallelApps = " + getMaxParallelApps() + "\n"
+ "usedCapacity = " +
+ queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
+ "(clusterResourceMemory * absoluteCapacity)]" + "\n"
+ "absoluteUsedCapacity = " + absoluteUsedCapacity
@ -386,7 +392,8 @@ public class LeafQueue extends AbstractCSQueue {
public int getNumApplications() {
readLock.lock();
try {
return getNumPendingApplications() + getNumActiveApplications();
return getNumPendingApplications() + getNumActiveApplications() +
getNumNonRunnableApps();
} finally {
readLock.unlock();
}
@ -887,16 +894,28 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.unlock();
}
}
private void addApplicationAttempt(FiCaSchedulerApp application,
User user) {
writeLock.lock();
try {
applicationAttemptMap.put(application.getApplicationAttemptId(),
application);
if (application.isRunnable()) {
runnableApps.add(application);
LOG.debug("Adding runnable application: {}",
application.getApplicationAttemptId());
} else {
nonRunnableApps.add(application);
LOG.info("Application attempt {} is not runnable,"
+ " parallel limit reached", application.getApplicationAttemptId());
return;
}
// Accept
user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
applicationAttemptMap.put(application.getApplicationAttemptId(),
application);
// Activate applications
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
@ -917,7 +936,9 @@ public class LeafQueue extends AbstractCSQueue {
.getPendingApplications() + " #user-active-applications: " + user
.getActiveApplications() + " #queue-pending-applications: "
+ getNumPendingApplications() + " #queue-active-applications: "
+ getNumActiveApplications());
+ getNumActiveApplications()
+ " #queue-nonrunnable-applications: "
+ getNumNonRunnableApps());
} finally {
writeLock.unlock();
}
@ -950,6 +971,15 @@ public class LeafQueue extends AbstractCSQueue {
// which is caused by wrong invoking order, will fix UT separately
User user = usersManager.getUserAndAddIfAbsent(userName);
boolean runnable = runnableApps.remove(application);
if (!runnable) {
// removeNonRunnableApp acquires the write lock again, which is fine
if (!removeNonRunnableApp(application)) {
LOG.error("Given app to remove " + application +
" does not exist in queue " + getQueuePath());
}
}
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
@ -2229,4 +2259,43 @@ public class LeafQueue extends AbstractCSQueue {
usedSeconds);
metrics.updatePreemptedForCustomResources(containerResource);
}
@Override
int getNumRunnableApps() {
readLock.lock();
try {
return runnableApps.size();
} finally {
readLock.unlock();
}
}
int getNumNonRunnableApps() {
readLock.lock();
try {
return nonRunnableApps.size();
} finally {
readLock.unlock();
}
}
boolean removeNonRunnableApp(FiCaSchedulerApp app) {
writeLock.lock();
try {
return nonRunnableApps.remove(app);
} finally {
writeLock.unlock();
}
}
List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
readLock.lock();
try {
appsToReturn.addAll(nonRunnableApps);
} finally {
readLock.unlock();
}
return appsToReturn;
}
}

View File

@ -93,6 +93,8 @@ public class ParentQueue extends AbstractCSQueue {
private long lastSkipQueueDebugLoggingTimestamp = -1;
private int runnableApps;
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@ -1383,4 +1385,32 @@ public class ParentQueue extends AbstractCSQueue {
public QueueOrderingPolicy getQueueOrderingPolicy() {
return queueOrderingPolicy;
}
@Override
int getNumRunnableApps() {
readLock.lock();
try {
return runnableApps;
} finally {
readLock.unlock();
}
}
void incrementRunnableApps() {
writeLock.lock();
try {
runnableApps++;
} finally {
writeLock.unlock();
}
}
void decrementRunnableApps() {
writeLock.lock();
try {
runnableApps--;
} finally {
writeLock.unlock();
}
}
}

View File

@ -112,6 +112,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private AbstractContainerAllocator containerAllocator;
private boolean runnable;
/**
* to hold the message if its app doesn't not get container from a node
*/
@ -139,6 +141,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
this.runnable = true;
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@ -1219,4 +1222,22 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
writeLock.unlock();
}
}
public void setRunnable(boolean runnable) {
writeLock.lock();
try {
this.runnable = runnable;
} finally {
writeLock.unlock();
}
}
public boolean isRunnable() {
readLock.lock();
try {
return runnable;
} finally {
readLock.unlock();
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSMaxRunningAppsEnforcer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
@ -175,6 +176,9 @@ public class TestReservationSystem extends
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
cs.setConf(conf);
CSMaxRunningAppsEnforcer enforcer =
Mockito.mock(CSMaxRunningAppsEnforcer.class);
cs.setMaxRunningAppsEnforcer(enforcer);
mockRMContext = ReservationSystemTestUtil.createRMContext(conf);

View File

@ -184,6 +184,7 @@ public class TestApplicationLimits {
doReturn(amResource).when(application).getAMResource(
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
when(application.isRunnable()).thenReturn(true);
return application;
}

View File

@ -0,0 +1,278 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.Before;
import org.junit.Test;
public class TestCSMaxRunningAppsEnforcer {
private CapacitySchedulerQueueManager queueManager;
private CSMaxRunningAppsEnforcer maxAppsEnforcer;
private int appNum;
private ControlledClock clock;
private RMContext rmContext;
private CapacityScheduler scheduler;
private ActivitiesManager activitiesManager;
private CapacitySchedulerConfiguration csConfig;
@Before
public void setup() throws IOException {
csConfig = new CapacitySchedulerConfiguration();
rmContext = mock(RMContext.class);
when(rmContext.getYarnConfiguration()).thenReturn(csConfig);
when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>());
clock = new ControlledClock();
scheduler = mock(CapacityScheduler.class);
when(rmContext.getScheduler()).thenReturn(scheduler);
when(scheduler.getConf()).thenReturn(csConfig);
when(scheduler.getConfig()).thenReturn(csConfig);
when(scheduler.getConfiguration()).thenReturn(csConfig);
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
when(scheduler.getRMContext()).thenReturn(rmContext);
when(scheduler.getClusterResource())
.thenReturn(Resource.newInstance(16384, 8));
when(scheduler.getMinimumAllocation())
.thenReturn(Resource.newInstance(1024, 1));
when(scheduler.getMinimumResourceCapability())
.thenReturn(Resource.newInstance(1024, 1));
activitiesManager = mock(ActivitiesManager.class);
maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler);
appNum = 0;
setupQueues(csConfig);
RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class);
AppPriorityACLsManager appPriorityACLManager =
mock(AppPriorityACLsManager.class);
when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
when(labelManager.getResourceByLabel(anyString(), any(Resource.class)))
.thenReturn(Resource.newInstance(16384, 8));
queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
appPriorityACLManager);
queueManager.setCapacitySchedulerContext(scheduler);
queueManager.initializeQueues(csConfig);
}
private void setupQueues(CapacitySchedulerConfiguration config) {
config.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"queue1", "queue2"});
config.setQueues("root.queue1", new String[] {"subqueue1", "subqueue2"});
config.setQueues("root.queue1.subqueue1", new String[] {"leaf1"});
config.setQueues("root.queue1.subqueue2", new String[] {"leaf2"});
config.setFloat(PREFIX + "root.capacity", 100.0f);
config.setFloat(PREFIX + "root.queue1.capacity", 50.0f);
config.setFloat(PREFIX + "root.queue2.capacity", 50.0f);
config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f);
config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f);
config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f);
config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f);
}
private FiCaSchedulerApp addApp(LeafQueue queue, String user) {
ApplicationId appId = ApplicationId.newInstance(0, appNum++);
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId,
user, queue, queue.getAbstractUsersManager(),
rmContext, Priority.newInstance(0), false,
activitiesManager) {
private final long startTime = clock.getTime();
@Override
public long getStartTime() {
return startTime;
}
};
maxAppsEnforcer.checkRunnabilityWithUpdate(attempt);
maxAppsEnforcer.trackApp(attempt);
queue.submitApplicationAttempt(attempt, attempt.getUser());
return attempt;
}
private void removeApp(FiCaSchedulerApp attempt) {
LeafQueue queue = attempt.getCSLeafQueue();
queue.finishApplicationAttempt(attempt, queue.getQueuePath());
maxAppsEnforcer.untrackApp(attempt);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
}
@Test
public void testRemoveDoesNotEnableAnyApp() {
ParentQueue root =
(ParentQueue) queueManager.getRootQueue();
LeafQueue leaf1 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue1.leaf1");
LeafQueue leaf2 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue2.leaf2");
root.setMaxParallelApps(2);
leaf1.setMaxParallelApps(1);
leaf2.setMaxParallelApps(1);
FiCaSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
removeApp(app1);
assertEquals(0, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
}
@Test
public void testRemoveEnablesAppOnCousinQueue() {
LeafQueue leaf1 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue1.leaf1");
LeafQueue leaf2 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue2.leaf2");
ParentQueue queue1 = (ParentQueue) queueManager
.getQueueByFullName("root.queue1");
queue1.setMaxParallelApps(2);
FiCaSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
removeApp(app1);
assertEquals(0, leaf1.getNumRunnableApps());
assertEquals(2, leaf2.getNumRunnableApps());
assertEquals(0, leaf2.getNumNonRunnableApps());
}
@Test
public void testRemoveEnablesOneByQueueOneByUser() {
LeafQueue leaf1 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue1.leaf1");
LeafQueue leaf2 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue2.leaf2");
leaf1.setMaxParallelApps(2);
//userMaxApps.put("user1", 1);
csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1);
FiCaSchedulerApp app1 = addApp(leaf1, "user1");
addApp(leaf1, "user2");
addApp(leaf1, "user3");
addApp(leaf2, "user1");
assertEquals(2, leaf1.getNumRunnableApps());
assertEquals(1, leaf1.getNumNonRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
removeApp(app1);
assertEquals(2, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(0, leaf1.getNumNonRunnableApps());
assertEquals(0, leaf2.getNumNonRunnableApps());
}
@Test
public void testRemoveEnablingOrderedByStartTime() {
LeafQueue leaf1 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue1.leaf1");
LeafQueue leaf2 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue2.leaf2");
ParentQueue queue1 = (ParentQueue) queueManager
.getQueueByFullName("root.queue1");
queue1.setMaxParallelApps(2);
FiCaSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
clock.tickSec(20);
addApp(leaf1, "user");
assertEquals(1, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(1, leaf1.getNumNonRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
removeApp(app1);
assertEquals(0, leaf1.getNumRunnableApps());
assertEquals(2, leaf2.getNumRunnableApps());
assertEquals(0, leaf2.getNumNonRunnableApps());
}
@Test
public void testMultipleAppsWaitingOnCousinQueue() {
LeafQueue leaf1 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue1.leaf1");
LeafQueue leaf2 = (LeafQueue) queueManager
.getQueueByFullName("root.queue1.subqueue2.leaf2");
ParentQueue queue1 = (ParentQueue) queueManager
.getQueueByFullName("root.queue1");
queue1.setMaxParallelApps(2);
FiCaSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getNumRunnableApps());
assertEquals(1, leaf2.getNumRunnableApps());
assertEquals(2, leaf2.getNumNonRunnableApps());
removeApp(app1);
assertEquals(0, leaf1.getNumRunnableApps());
assertEquals(2, leaf2.getNumRunnableApps());
assertEquals(1, leaf2.getNumNonRunnableApps());
}
@Test
public void testMultiListStartTimeIteratorEmptyAppLists() {
List<List<FiCaSchedulerApp>> lists =
new ArrayList<List<FiCaSchedulerApp>>();
lists.add(Arrays.asList(mockAppAttempt(1)));
lists.add(Arrays.asList(mockAppAttempt(2)));
Iterator<FiCaSchedulerApp> iter =
new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
assertEquals(1, iter.next().getStartTime());
assertEquals(2, iter.next().getStartTime());
}
private FiCaSchedulerApp mockAppAttempt(long startTime) {
FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class);
when(schedApp.getStartTime()).thenReturn(startTime);
return schedApp;
}
}

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Sets;
public class TestCapacitySchedulerMaxParallelApps {
private CapacitySchedulerConfiguration conf;
private MockRM rm;
private MockNM nm1;
private RMApp app1;
private MockAM am1;
private RMApp app2;
private MockAM am2;
private RMApp app3;
private RMAppAttempt attempt3;
private RMApp app4;
private RMAppAttempt attempt4;
private ParentQueue rootQueue;
private LeafQueue defaultQueue;
@Before
public void setUp() {
CapacitySchedulerConfiguration config =
new CapacitySchedulerConfiguration();
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getName());
conf = new CapacitySchedulerConfiguration(config);
}
@After
public void after() {
if (rm != null) {
rm.stop();
}
}
@Test(timeout = 30000)
public void testMaxParallelAppsExceedsQueueSetting() throws Exception {
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
executeCommonStepsAndChecks();
testWhenSettingsExceeded();
}
@Test(timeout = 30000)
public void testMaxParallelAppsExceedsDefaultQueueSetting()
throws Exception {
conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2);
executeCommonStepsAndChecks();
testWhenSettingsExceeded();
}
@Test(timeout = 30000)
public void testMaxParallelAppsExceedsUserSetting() throws Exception {
conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2);
executeCommonStepsAndChecks();
testWhenSettingsExceeded();
}
@Test(timeout = 30000)
public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception {
conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2);
executeCommonStepsAndChecks();
testWhenSettingsExceeded();
}
@Test(timeout = 30000)
public void testMaxParallelAppsWhenReloadingConfig() throws Exception {
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
executeCommonStepsAndChecks();
RMContext rmContext = rm.getRMContext();
// Disable parallel apps setting + max out AM percent
conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps");
conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
cs.reinitialize(conf, rmContext);
// Both app #3 and app #4 should transition to RUNNABLE
launchAMandWaitForRunning(app3, attempt3, nm1);
launchAMandWaitForRunning(app4, attempt4, nm1);
verifyRunningAndAcceptedApps(4, 0);
}
@Test(timeout = 30000)
public void testMaxAppsReachedWithNonRunnableApps() throws Exception {
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4);
executeCommonStepsAndChecks();
RMApp app5 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app5")
.withUser("testuser")
.withQueue("default")
.withWaitForAppAcceptedState(false)
.build());
rm.waitForState(app5.getApplicationId(), RMAppState.FAILED);
}
private void executeCommonStepsAndChecks() throws Exception {
rm = new MockRM(conf);
rm.start();
nm1 = rm.registerNode("h1:1234", 4096, 8);
rm.registerNode("h2:1234", 4096, 8);
rm.registerNode("h3:1234", 4096, 8);
rm.drainEvents();
app1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app1")
.withUser("testuser")
.withQueue("default")
.build());
am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
app2 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app2")
.withUser("testuser")
.withQueue("default")
.build());
am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
app3 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app3")
.withUser("testuser")
.withQueue("default")
.build());
attempt3 = MockRM.waitForAttemptScheduled(app3, rm);
app4 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
.withAppName("app4")
.withUser("testuser")
.withQueue("default")
.build());
attempt4 = MockRM.waitForAttemptScheduled(app4, rm);
// Check that app attempt #3 and #4 are non-runnable
rootQueue = getRootQueue();
defaultQueue = getDefaultQueue();
Set<ApplicationAttemptId> nonRunnables =
Sets.newHashSet(
attempt3.getAppAttemptId(),
attempt4.getAppAttemptId());
verifyRunnableAppsInParent(rootQueue, 2);
verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables);
verifyRunningAndAcceptedApps(2, 2);
}
private void testWhenSettingsExceeded() throws Exception {
// Stop app #1
unregisterAMandWaitForFinish(app1, am1, nm1);
// Launch app #3
launchAMandWaitForRunning(app3, attempt3, nm1);
// Check that attempt #4 is still non-runnable
verifyRunnableAppsInParent(rootQueue, 2);
verifyRunnableAppsInLeaf(defaultQueue, 2,
Collections.singleton(attempt4.getAppAttemptId()));
verifyRunningAndAcceptedApps(2, 1);
// Stop app #2
unregisterAMandWaitForFinish(app2, am2, nm1);
// Launch app #4
launchAMandWaitForRunning(app4, attempt4, nm1);
verifyRunnableAppsInParent(rootQueue, 2);
verifyRunnableAppsInLeaf(defaultQueue, 2,
Collections.emptySet());
verifyRunningAndAcceptedApps(2, 0);
}
@SuppressWarnings("checkstyle:hiddenfield")
private LeafQueue getDefaultQueue() {
CSQueue defaultQueue =
((CapacityScheduler) rm.getResourceScheduler()).getQueue("default");
return (LeafQueue) defaultQueue;
}
private ParentQueue getRootQueue() {
CSQueue root =
((CapacityScheduler) rm.getResourceScheduler()).getQueue("root");
return (ParentQueue) root;
}
private void verifyRunnableAppsInParent(ParentQueue queue,
int expectedRunnable) {
assertEquals("Num of runnable apps", expectedRunnable,
queue.getNumRunnableApps());
}
private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable,
Set<ApplicationAttemptId> nonRunnableIds) {
assertEquals("Num of runnable apps", expectedRunnable,
queue.getNumRunnableApps());
queue.getCopyOfNonRunnableAppSchedulables()
.stream()
.map(fca -> fca.getApplicationAttemptId())
.forEach(id -> assertTrue(id + " not found as non-runnable",
nonRunnableIds.contains(id)));
}
private void verifyRunningAndAcceptedApps(int expectedRunning,
int expectedAccepted) throws YarnException {
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
GetApplicationsResponse resp =
rm.getClientRMService().getApplications(request);
List<ApplicationReport> apps = resp.getApplicationList();
long runningCount = apps
.stream()
.filter(report ->
report.getYarnApplicationState() == YarnApplicationState.RUNNING)
.count();
long acceptedCount = apps
.stream()
.filter(report ->
report.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
.count();
assertEquals("Running apps count", expectedRunning, runningCount);
assertEquals("Accepted apps count", expectedAccepted, acceptedCount);
}
private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm)
throws Exception {
am.unregisterAppAttempt();
nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1,
ContainerState.COMPLETE);
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.FINISHED);
}
@SuppressWarnings("rawtypes")
private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt,
MockNM nm) throws Exception {
nm.nodeHeartbeat(true);
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
rm.drainEvents();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
}

View File

@ -456,6 +456,8 @@ public class TestLeafQueue {
@Test
public void testAppAttemptMetrics() throws Exception {
CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
cs.setMaxRunningAppsEnforcer(enforcer);
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));

View File

@ -218,6 +218,7 @@ public class TestQueueState {
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
when(application.isRunnable()).thenReturn(true);
return application;
}

View File

@ -157,6 +157,7 @@ public class TestQueueStateManager {
CommonNodeLabelsManager.NO_LABEL);
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
.thenCallRealMethod();
when(application.isRunnable()).thenReturn(true);
return application;
}
}