YARN-9930. Support max running app logic for CapacityScheduler. Contributed by Peter Bacsko
This commit is contained in:
parent
56d72adbdd
commit
8f1b70e367
|
@ -146,6 +146,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
|
||||
volatile Priority priority = Priority.newInstance(0);
|
||||
private Map<String, Float> userWeights = new HashMap<String, Float>();
|
||||
private int maxParallelApps;
|
||||
|
||||
public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
|
@ -390,6 +391,11 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
// and queue setting
|
||||
setupMaximumAllocation(configuration);
|
||||
|
||||
// Max parallel apps
|
||||
int queueMaxParallelApps =
|
||||
configuration.getMaxParallelAppsForQueue(getQueuePath());
|
||||
setMaxParallelApps(queueMaxParallelApps);
|
||||
|
||||
// initialized the queue state based on previous state, configured state
|
||||
// and its parent state.
|
||||
QueueState previous = getState();
|
||||
|
@ -1421,4 +1427,14 @@ public abstract class AbstractCSQueue implements CSQueue {
|
|||
public boolean getDefaultAppLifetimeWasSpecifiedInConfig() {
|
||||
return defaultAppLifetimeWasSpecifiedInConfig;
|
||||
}
|
||||
|
||||
public void setMaxParallelApps(int maxParallelApps) {
|
||||
this.maxParallelApps = maxParallelApps;
|
||||
}
|
||||
|
||||
public int getMaxParallelApps() {
|
||||
return maxParallelApps;
|
||||
}
|
||||
|
||||
abstract int getNumRunnableApps();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,436 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
|
||||
/**
|
||||
* Handles tracking and enforcement for user and queue maxRunningApps
|
||||
* constraints.
|
||||
*/
|
||||
public class CSMaxRunningAppsEnforcer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
CSMaxRunningAppsEnforcer.class);
|
||||
|
||||
private final CapacityScheduler scheduler;
|
||||
|
||||
// Tracks the number of running applications by user.
|
||||
private final Map<String, Integer> usersNumRunnableApps;
|
||||
|
||||
private final ListMultimap<String, FiCaSchedulerApp> usersNonRunnableApps;
|
||||
|
||||
public CSMaxRunningAppsEnforcer(CapacityScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
this.usersNumRunnableApps = new HashMap<String, Integer>();
|
||||
this.usersNonRunnableApps = ArrayListMultimap.create();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether making the application runnable would exceed any
|
||||
* maxRunningApps limits. Also sets the "runnable" flag on the
|
||||
* attempt.
|
||||
*
|
||||
* @param attempt the app attempt being checked
|
||||
* @return true if the application is runnable; false otherwise
|
||||
*/
|
||||
public boolean checkRunnabilityWithUpdate(
|
||||
FiCaSchedulerApp attempt) {
|
||||
boolean attemptCanRun = !exceedUserMaxParallelApps(attempt.getUser())
|
||||
&& !exceedQueueMaxParallelApps(attempt.getCSLeafQueue());
|
||||
|
||||
attempt.setRunnable(attemptCanRun);
|
||||
|
||||
return attemptCanRun;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the number of user runnable apps exceeds the limitation.
|
||||
*
|
||||
* @param user the user name
|
||||
* @return true if the number hits the limit; false otherwise
|
||||
*/
|
||||
private boolean exceedUserMaxParallelApps(String user) {
|
||||
Integer userNumRunnable = usersNumRunnableApps.get(user);
|
||||
if (userNumRunnable == null) {
|
||||
userNumRunnable = 0;
|
||||
}
|
||||
if (userNumRunnable >= getUserMaxParallelApps(user)) {
|
||||
LOG.info("Maximum runnable apps exceeded for user {}", user);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursively checks whether the number of queue runnable apps exceeds the
|
||||
* limitation.
|
||||
*
|
||||
* @param queue the current queue
|
||||
* @return true if the number hits the limit; false otherwise
|
||||
*/
|
||||
private boolean exceedQueueMaxParallelApps(AbstractCSQueue queue) {
|
||||
// Check queue and all parent queues
|
||||
while (queue != null) {
|
||||
if (queue.getNumRunnableApps() >= queue.getMaxParallelApps()) {
|
||||
LOG.info("Maximum runnable apps exceeded for queue {}",
|
||||
queue.getQueuePath());
|
||||
return true;
|
||||
}
|
||||
queue = (AbstractCSQueue) queue.getParent();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public void trackApp(FiCaSchedulerApp app) {
|
||||
if (app.isRunnable()) {
|
||||
trackRunnableApp(app);
|
||||
} else {
|
||||
trackNonRunnableApp(app);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Tracks the given new runnable app for purposes of maintaining max running
|
||||
* app limits.
|
||||
*/
|
||||
private void trackRunnableApp(FiCaSchedulerApp app) {
|
||||
String user = app.getUser();
|
||||
AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
|
||||
// Increment running counts for all parent queues
|
||||
ParentQueue parent = (ParentQueue) queue.getParent();
|
||||
while (parent != null) {
|
||||
parent.incrementRunnableApps();
|
||||
parent = (ParentQueue) parent.getParent();
|
||||
}
|
||||
|
||||
Integer userNumRunnable = usersNumRunnableApps.get(user);
|
||||
usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
|
||||
: userNumRunnable) + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tracks the given new non runnable app so that it can be made runnable when
|
||||
* it would not violate max running app limits.
|
||||
*/
|
||||
private void trackNonRunnableApp(FiCaSchedulerApp app) {
|
||||
String user = app.getUser();
|
||||
usersNonRunnableApps.put(user, app);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is called after reloading the allocation configuration when the
|
||||
* scheduler is reinitialized
|
||||
*
|
||||
* Checks to see whether any non-runnable applications become runnable
|
||||
* now that the max running apps of given queue has been changed
|
||||
*
|
||||
* Runs in O(n) where n is the number of apps that are non-runnable and in
|
||||
* the queues that went from having no slack to having slack.
|
||||
*/
|
||||
|
||||
public void updateRunnabilityOnReload() {
|
||||
ParentQueue rootQueue = (ParentQueue) scheduler.getRootQueue();
|
||||
List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
|
||||
new ArrayList<List<FiCaSchedulerApp>>();
|
||||
|
||||
gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
|
||||
|
||||
updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see whether any other applications runnable now that the given
|
||||
* application has been removed from the given queue. And makes them so.
|
||||
*
|
||||
* Runs in O(n log(n)) where n is the number of queues that are under the
|
||||
* highest queue that went from having no slack to having slack.
|
||||
*/
|
||||
public void updateRunnabilityOnAppRemoval(FiCaSchedulerApp app) {
|
||||
// childqueueX might have no pending apps itself, but if a queue higher up
|
||||
// in the hierarchy parentqueueY has a maxRunningApps set, an app completion
|
||||
// in childqueueX could allow an app in some other distant child of
|
||||
// parentqueueY to become runnable.
|
||||
// An app removal will only possibly allow another app to become runnable if
|
||||
// the queue was already at its max before the removal.
|
||||
// Thus we find the ancestor queue highest in the tree for which the app
|
||||
// that was at its maxRunningApps before the removal.
|
||||
LeafQueue queue = app.getCSLeafQueue();
|
||||
AbstractCSQueue highestQueueWithAppsNowRunnable =
|
||||
(queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
|
||||
? queue : null;
|
||||
|
||||
ParentQueue parent = (ParentQueue) queue.getParent();
|
||||
while (parent != null) {
|
||||
if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) {
|
||||
highestQueueWithAppsNowRunnable = parent;
|
||||
}
|
||||
parent = (ParentQueue) parent.getParent();
|
||||
}
|
||||
|
||||
List<List<FiCaSchedulerApp>> appsNowMaybeRunnable =
|
||||
new ArrayList<List<FiCaSchedulerApp>>();
|
||||
|
||||
// Compile lists of apps which may now be runnable
|
||||
// We gather lists instead of building a set of all non-runnable apps so
|
||||
// that this whole operation can be O(number of queues) instead of
|
||||
// O(number of apps)
|
||||
if (highestQueueWithAppsNowRunnable != null) {
|
||||
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
|
||||
appsNowMaybeRunnable);
|
||||
}
|
||||
String user = app.getUser();
|
||||
Integer userNumRunning = usersNumRunnableApps.get(user);
|
||||
if (userNumRunning == null) {
|
||||
userNumRunning = 0;
|
||||
}
|
||||
if (userNumRunning == getUserMaxParallelApps(user) - 1) {
|
||||
List<FiCaSchedulerApp> userWaitingApps = usersNonRunnableApps.get(user);
|
||||
if (userWaitingApps != null) {
|
||||
appsNowMaybeRunnable.add(userWaitingApps);
|
||||
}
|
||||
}
|
||||
|
||||
updateAppsRunnability(appsNowMaybeRunnable,
|
||||
appsNowMaybeRunnable.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see whether applications are runnable now by iterating
|
||||
* through each one of them and check if the queue and user have slack.
|
||||
*
|
||||
* if we know how many apps can be runnable, there is no need to iterate
|
||||
* through all apps, maxRunnableApps is used to break out of the iteration.
|
||||
*/
|
||||
private void updateAppsRunnability(List<List<FiCaSchedulerApp>>
|
||||
appsNowMaybeRunnable, int maxRunnableApps) {
|
||||
// Scan through and check whether this means that any apps are now runnable
|
||||
Iterator<FiCaSchedulerApp> iter = new MultiListStartTimeIterator(
|
||||
appsNowMaybeRunnable);
|
||||
FiCaSchedulerApp prev = null;
|
||||
List<FiCaSchedulerApp> noLongerPendingApps = new ArrayList<>();
|
||||
while (iter.hasNext()) {
|
||||
FiCaSchedulerApp next = iter.next();
|
||||
if (next == prev) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (checkRunnabilityWithUpdate(next)) {
|
||||
LeafQueue nextQueue = next.getCSLeafQueue();
|
||||
LOG.info("{} is now runnable in {}",
|
||||
next.getApplicationAttemptId(), nextQueue);
|
||||
trackRunnableApp(next);
|
||||
FiCaSchedulerApp appSched = next;
|
||||
nextQueue.submitApplicationAttempt(next, next.getUser());
|
||||
noLongerPendingApps.add(appSched);
|
||||
|
||||
if (noLongerPendingApps.size() >= maxRunnableApps) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
prev = next;
|
||||
}
|
||||
|
||||
// We remove the apps from their pending lists afterwards so that we don't
|
||||
// pull them out from under the iterator. If they are not in these lists
|
||||
// in the first place, there is a bug.
|
||||
for (FiCaSchedulerApp appSched : noLongerPendingApps) {
|
||||
if (!(appSched.getCSLeafQueue().removeNonRunnableApp(appSched))) {
|
||||
LOG.error("Can't make app runnable that does not already exist in queue"
|
||||
+ " as non-runnable: {}. This should never happen.",
|
||||
appSched.getApplicationAttemptId());
|
||||
}
|
||||
|
||||
if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
|
||||
LOG.error("Waiting app {} expected to be in "
|
||||
+ "usersNonRunnableApps, but was not. This should never happen.",
|
||||
appSched.getApplicationAttemptId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void untrackApp(FiCaSchedulerApp app) {
|
||||
if (app.isRunnable()) {
|
||||
untrackRunnableApp(app);
|
||||
} else {
|
||||
untrackNonRunnableApp(app);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the relevant tracking variables after a runnable app with the given
|
||||
* queue and user has been removed.
|
||||
*/
|
||||
private void untrackRunnableApp(FiCaSchedulerApp app) {
|
||||
// Update usersRunnableApps
|
||||
String user = app.getUser();
|
||||
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
|
||||
if (newUserNumRunning == 0) {
|
||||
usersNumRunnableApps.remove(user);
|
||||
} else {
|
||||
usersNumRunnableApps.put(user, newUserNumRunning);
|
||||
}
|
||||
|
||||
// Update runnable app bookkeeping for queues
|
||||
AbstractCSQueue queue = (AbstractCSQueue) app.getQueue();
|
||||
ParentQueue parent = (ParentQueue) queue.getParent();
|
||||
while (parent != null) {
|
||||
parent.decrementRunnableApps();
|
||||
parent = (ParentQueue) parent.getParent();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops tracking the given non-runnable app.
|
||||
*/
|
||||
private void untrackNonRunnableApp(FiCaSchedulerApp app) {
|
||||
usersNonRunnableApps.remove(app.getUser(), app);
|
||||
}
|
||||
|
||||
/**
|
||||
* Traverses the queue hierarchy under the given queue to gather all lists
|
||||
* of non-runnable applications.
|
||||
*/
|
||||
private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
|
||||
List<List<FiCaSchedulerApp>> appLists) {
|
||||
if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
|
||||
if (queue instanceof LeafQueue) {
|
||||
appLists.add(
|
||||
((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
|
||||
} else {
|
||||
for (CSQueue child : queue.getChildQueues()) {
|
||||
gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int getUserMaxParallelApps(String user) {
|
||||
CapacitySchedulerConfiguration conf = scheduler.getConfiguration();
|
||||
if (conf == null) {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
int userMaxParallelApps = conf.getMaxParallelAppsForUser(user);
|
||||
|
||||
return userMaxParallelApps;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a list of lists, each of which is ordered by start time, and returns
|
||||
* their elements in order of start time.
|
||||
*
|
||||
* We maintain positions in each of the lists. Each next() call advances
|
||||
* the position in one of the lists. We maintain a heap that orders lists
|
||||
* by the start time of the app in the current position in that list.
|
||||
* This allows us to pick which list to advance in O(log(num lists)) instead
|
||||
* of O(num lists) time.
|
||||
*/
|
||||
static class MultiListStartTimeIterator implements
|
||||
Iterator<FiCaSchedulerApp> {
|
||||
|
||||
private List<FiCaSchedulerApp>[] appLists;
|
||||
private int[] curPositionsInAppLists;
|
||||
private PriorityQueue<IndexAndTime> appListsByCurStartTime;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MultiListStartTimeIterator(List<List<FiCaSchedulerApp>> appListList) {
|
||||
appLists = appListList.toArray(new List[appListList.size()]);
|
||||
curPositionsInAppLists = new int[appLists.length];
|
||||
appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
|
||||
for (int i = 0; i < appLists.length; i++) {
|
||||
long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
|
||||
.getStartTime();
|
||||
appListsByCurStartTime.add(new IndexAndTime(i, time));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return !appListsByCurStartTime.isEmpty()
|
||||
&& appListsByCurStartTime.peek().time != Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FiCaSchedulerApp next() {
|
||||
IndexAndTime indexAndTime = appListsByCurStartTime.remove();
|
||||
int nextListIndex = indexAndTime.index;
|
||||
FiCaSchedulerApp next = appLists[nextListIndex]
|
||||
.get(curPositionsInAppLists[nextListIndex]);
|
||||
curPositionsInAppLists[nextListIndex]++;
|
||||
|
||||
if (curPositionsInAppLists[nextListIndex] <
|
||||
appLists[nextListIndex].size()) {
|
||||
indexAndTime.time = appLists[nextListIndex]
|
||||
.get(curPositionsInAppLists[nextListIndex]).getStartTime();
|
||||
} else {
|
||||
indexAndTime.time = Long.MAX_VALUE;
|
||||
}
|
||||
appListsByCurStartTime.add(indexAndTime);
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException("Remove not supported");
|
||||
}
|
||||
|
||||
private static class IndexAndTime implements Comparable<IndexAndTime> {
|
||||
private int index;
|
||||
private long time;
|
||||
|
||||
IndexAndTime(int index, long time) {
|
||||
this.index = index;
|
||||
this.time = time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(IndexAndTime o) {
|
||||
return time < o.time ? -1 : (time > o.time ? 1 : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof IndexAndTime)) {
|
||||
return false;
|
||||
}
|
||||
IndexAndTime other = (IndexAndTime)o;
|
||||
return other.time == time;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (int)time;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -242,8 +242,11 @@ public class CapacityScheduler extends
|
|||
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
|
||||
private long asyncMaxPendingBacklogs;
|
||||
|
||||
private CSMaxRunningAppsEnforcer maxRunningEnforcer;
|
||||
|
||||
public CapacityScheduler() {
|
||||
super(CapacityScheduler.class.getName());
|
||||
this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -483,6 +486,7 @@ public class CapacityScheduler extends
|
|||
|
||||
super.reinitialize(newConf, rmContext);
|
||||
}
|
||||
maxRunningEnforcer.updateRunnabilityOnReload();
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -1080,6 +1084,9 @@ public class CapacityScheduler extends
|
|||
// SchedulerApplication#setCurrentAppAttempt.
|
||||
attempt.setPriority(application.getPriority());
|
||||
|
||||
maxRunningEnforcer.checkRunnabilityWithUpdate(attempt);
|
||||
maxRunningEnforcer.trackApp(attempt);
|
||||
|
||||
queue.submitApplicationAttempt(attempt, application.getUser());
|
||||
LOG.info("Added Application Attempt " + applicationAttemptId
|
||||
+ " to scheduler from user " + application.getUser() + " in queue "
|
||||
|
@ -1173,8 +1180,13 @@ public class CapacityScheduler extends
|
|||
LOG.error(
|
||||
"Cannot finish application " + "from non-leaf queue: "
|
||||
+ csQueue.getQueuePath());
|
||||
} else{
|
||||
} else {
|
||||
csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath());
|
||||
|
||||
maxRunningEnforcer.untrackApp(attempt);
|
||||
if (attempt.isRunnable()) {
|
||||
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
@ -3268,4 +3280,9 @@ public class CapacityScheduler extends
|
|||
public int getNumAsyncSchedulerThreads() {
|
||||
return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) {
|
||||
this.maxRunningEnforcer = enforcer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -379,6 +379,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
public static final Pattern RESOURCE_PATTERN =
|
||||
Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
|
||||
|
||||
public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
|
||||
|
||||
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Different resource types supported.
|
||||
*/
|
||||
|
@ -413,7 +417,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT;
|
||||
return queueName;
|
||||
}
|
||||
|
||||
|
||||
static String getUserPrefix(String user) {
|
||||
return PREFIX + "user." + user + DOT;
|
||||
}
|
||||
|
||||
private String getNodeLabelPrefix(String queue, String label) {
|
||||
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
|
||||
return getQueuePrefix(queue);
|
||||
|
@ -1387,6 +1395,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
|
||||
}
|
||||
|
||||
public Integer getMaxParallelAppsForQueue(String queue) {
|
||||
int defaultMaxParallelAppsForQueue =
|
||||
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
|
||||
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
||||
|
||||
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
|
||||
+ MAX_PARALLEL_APPLICATIONS);
|
||||
|
||||
return (maxParallelAppsForQueue != null) ?
|
||||
Integer.parseInt(maxParallelAppsForQueue)
|
||||
: defaultMaxParallelAppsForQueue;
|
||||
}
|
||||
|
||||
public Integer getMaxParallelAppsForUser(String user) {
|
||||
int defaultMaxParallelAppsForUser =
|
||||
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
|
||||
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
||||
String maxParallelAppsForUser = get(getUserPrefix(user)
|
||||
+ MAX_PARALLEL_APPLICATIONS);
|
||||
|
||||
return (maxParallelAppsForUser != null) ?
|
||||
Integer.parseInt(maxParallelAppsForUser)
|
||||
: defaultMaxParallelAppsForUser;
|
||||
}
|
||||
|
||||
private static final String PREEMPTION_CONFIG_PREFIX =
|
||||
"yarn.resourcemanager.monitor.capacity.preemption.";
|
||||
|
||||
|
|
|
@ -129,6 +129,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
List<AppPriorityACLGroup> priorityAcls =
|
||||
new ArrayList<AppPriorityACLGroup>();
|
||||
|
||||
private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
|
||||
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
|
@ -159,6 +162,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
setupQueueConfigs(clusterResource, csContext.getConfiguration());
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:nowhitespaceafter")
|
||||
protected void setupQueueConfigs(Resource clusterResource,
|
||||
CapacitySchedulerConfiguration conf) throws
|
||||
IOException {
|
||||
|
@ -289,7 +293,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
+ " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
|
||||
+ "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
|
||||
+ " [= (int)(maxApplications * (userLimit / 100.0f) * "
|
||||
+ "userLimitFactor) ]" + "\n" + "usedCapacity = "
|
||||
+ "userLimitFactor) ]" + "\n"
|
||||
+ "maxParallelApps = " + getMaxParallelApps() + "\n"
|
||||
+ "usedCapacity = " +
|
||||
+ queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
|
||||
+ "(clusterResourceMemory * absoluteCapacity)]" + "\n"
|
||||
+ "absoluteUsedCapacity = " + absoluteUsedCapacity
|
||||
|
@ -386,7 +392,8 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
public int getNumApplications() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return getNumPendingApplications() + getNumActiveApplications();
|
||||
return getNumPendingApplications() + getNumActiveApplications() +
|
||||
getNumNonRunnableApps();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
|
@ -887,16 +894,28 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void addApplicationAttempt(FiCaSchedulerApp application,
|
||||
User user) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
applicationAttemptMap.put(application.getApplicationAttemptId(),
|
||||
application);
|
||||
|
||||
if (application.isRunnable()) {
|
||||
runnableApps.add(application);
|
||||
LOG.debug("Adding runnable application: {}",
|
||||
application.getApplicationAttemptId());
|
||||
} else {
|
||||
nonRunnableApps.add(application);
|
||||
LOG.info("Application attempt {} is not runnable,"
|
||||
+ " parallel limit reached", application.getApplicationAttemptId());
|
||||
return;
|
||||
}
|
||||
|
||||
// Accept
|
||||
user.submitApplication();
|
||||
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
|
||||
applicationAttemptMap.put(application.getApplicationAttemptId(),
|
||||
application);
|
||||
|
||||
// Activate applications
|
||||
if (Resources.greaterThan(resourceCalculator, lastClusterResource,
|
||||
|
@ -917,7 +936,9 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
.getPendingApplications() + " #user-active-applications: " + user
|
||||
.getActiveApplications() + " #queue-pending-applications: "
|
||||
+ getNumPendingApplications() + " #queue-active-applications: "
|
||||
+ getNumActiveApplications());
|
||||
+ getNumActiveApplications()
|
||||
+ " #queue-nonrunnable-applications: "
|
||||
+ getNumNonRunnableApps());
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -950,6 +971,15 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// which is caused by wrong invoking order, will fix UT separately
|
||||
User user = usersManager.getUserAndAddIfAbsent(userName);
|
||||
|
||||
boolean runnable = runnableApps.remove(application);
|
||||
if (!runnable) {
|
||||
// removeNonRunnableApp acquires the write lock again, which is fine
|
||||
if (!removeNonRunnableApp(application)) {
|
||||
LOG.error("Given app to remove " + application +
|
||||
" does not exist in queue " + getQueuePath());
|
||||
}
|
||||
}
|
||||
|
||||
String partitionName = application.getAppAMNodePartitionName();
|
||||
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
|
||||
if (!wasActive) {
|
||||
|
@ -2229,4 +2259,43 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
usedSeconds);
|
||||
metrics.updatePreemptedForCustomResources(containerResource);
|
||||
}
|
||||
|
||||
@Override
|
||||
int getNumRunnableApps() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return runnableApps.size();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
int getNumNonRunnableApps() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return nonRunnableApps.size();
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
boolean removeNonRunnableApp(FiCaSchedulerApp app) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
return nonRunnableApps.remove(app);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
|
||||
List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
|
||||
readLock.lock();
|
||||
try {
|
||||
appsToReturn.addAll(nonRunnableApps);
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
return appsToReturn;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -93,6 +93,8 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
private long lastSkipQueueDebugLoggingTimestamp = -1;
|
||||
|
||||
private int runnableApps;
|
||||
|
||||
public ParentQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
super(cs, queueName, parent, old);
|
||||
|
@ -1382,4 +1384,32 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
public QueueOrderingPolicy getQueueOrderingPolicy() {
|
||||
return queueOrderingPolicy;
|
||||
}
|
||||
|
||||
@Override
|
||||
int getNumRunnableApps() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return runnableApps;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void incrementRunnableApps() {
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps++;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void decrementRunnableApps() {
|
||||
writeLock.lock();
|
||||
try {
|
||||
runnableApps--;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,6 +112,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
|
||||
private AbstractContainerAllocator containerAllocator;
|
||||
|
||||
private boolean runnable;
|
||||
|
||||
/**
|
||||
* to hold the message if its app doesn't not get container from a node
|
||||
*/
|
||||
|
@ -139,6 +141,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
|
||||
ActivitiesManager activitiesManager) {
|
||||
super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
|
||||
this.runnable = true;
|
||||
|
||||
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
|
||||
|
||||
|
@ -1219,4 +1222,22 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void setRunnable(boolean runnable) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
this.runnable = runnable;
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isRunnable() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return runnable;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSMaxRunningAppsEnforcer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
|
@ -175,6 +176,9 @@ public class TestReservationSystem extends
|
|||
|
||||
CapacityScheduler cs = Mockito.spy(new CapacityScheduler());
|
||||
cs.setConf(conf);
|
||||
CSMaxRunningAppsEnforcer enforcer =
|
||||
Mockito.mock(CSMaxRunningAppsEnforcer.class);
|
||||
cs.setMaxRunningAppsEnforcer(enforcer);
|
||||
|
||||
mockRMContext = ReservationSystemTestUtil.createRMContext(conf);
|
||||
|
||||
|
|
|
@ -184,6 +184,7 @@ public class TestApplicationLimits {
|
|||
doReturn(amResource).when(application).getAMResource(
|
||||
CommonNodeLabelsManager.NO_LABEL);
|
||||
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
|
||||
when(application.isRunnable()).thenReturn(true);
|
||||
return application;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,278 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestCSMaxRunningAppsEnforcer {
|
||||
private CapacitySchedulerQueueManager queueManager;
|
||||
private CSMaxRunningAppsEnforcer maxAppsEnforcer;
|
||||
private int appNum;
|
||||
private ControlledClock clock;
|
||||
private RMContext rmContext;
|
||||
private CapacityScheduler scheduler;
|
||||
private ActivitiesManager activitiesManager;
|
||||
private CapacitySchedulerConfiguration csConfig;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
csConfig = new CapacitySchedulerConfiguration();
|
||||
rmContext = mock(RMContext.class);
|
||||
when(rmContext.getYarnConfiguration()).thenReturn(csConfig);
|
||||
when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>());
|
||||
clock = new ControlledClock();
|
||||
scheduler = mock(CapacityScheduler.class);
|
||||
when(rmContext.getScheduler()).thenReturn(scheduler);
|
||||
when(scheduler.getConf()).thenReturn(csConfig);
|
||||
when(scheduler.getConfig()).thenReturn(csConfig);
|
||||
when(scheduler.getConfiguration()).thenReturn(csConfig);
|
||||
when(scheduler.getResourceCalculator()).thenReturn(
|
||||
new DefaultResourceCalculator());
|
||||
when(scheduler.getRMContext()).thenReturn(rmContext);
|
||||
when(scheduler.getClusterResource())
|
||||
.thenReturn(Resource.newInstance(16384, 8));
|
||||
when(scheduler.getMinimumAllocation())
|
||||
.thenReturn(Resource.newInstance(1024, 1));
|
||||
when(scheduler.getMinimumResourceCapability())
|
||||
.thenReturn(Resource.newInstance(1024, 1));
|
||||
activitiesManager = mock(ActivitiesManager.class);
|
||||
maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler);
|
||||
appNum = 0;
|
||||
setupQueues(csConfig);
|
||||
RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class);
|
||||
AppPriorityACLsManager appPriorityACLManager =
|
||||
mock(AppPriorityACLsManager.class);
|
||||
when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
|
||||
when(labelManager.getResourceByLabel(anyString(), any(Resource.class)))
|
||||
.thenReturn(Resource.newInstance(16384, 8));
|
||||
queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
|
||||
appPriorityACLManager);
|
||||
queueManager.setCapacitySchedulerContext(scheduler);
|
||||
queueManager.initializeQueues(csConfig);
|
||||
}
|
||||
|
||||
private void setupQueues(CapacitySchedulerConfiguration config) {
|
||||
config.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"queue1", "queue2"});
|
||||
config.setQueues("root.queue1", new String[] {"subqueue1", "subqueue2"});
|
||||
config.setQueues("root.queue1.subqueue1", new String[] {"leaf1"});
|
||||
config.setQueues("root.queue1.subqueue2", new String[] {"leaf2"});
|
||||
config.setFloat(PREFIX + "root.capacity", 100.0f);
|
||||
config.setFloat(PREFIX + "root.queue1.capacity", 50.0f);
|
||||
config.setFloat(PREFIX + "root.queue2.capacity", 50.0f);
|
||||
config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f);
|
||||
config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f);
|
||||
config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f);
|
||||
config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f);
|
||||
}
|
||||
|
||||
private FiCaSchedulerApp addApp(LeafQueue queue, String user) {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, appNum++);
|
||||
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
|
||||
|
||||
FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId,
|
||||
user, queue, queue.getAbstractUsersManager(),
|
||||
rmContext, Priority.newInstance(0), false,
|
||||
activitiesManager) {
|
||||
|
||||
private final long startTime = clock.getTime();
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
};
|
||||
|
||||
maxAppsEnforcer.checkRunnabilityWithUpdate(attempt);
|
||||
maxAppsEnforcer.trackApp(attempt);
|
||||
|
||||
queue.submitApplicationAttempt(attempt, attempt.getUser());
|
||||
|
||||
return attempt;
|
||||
}
|
||||
|
||||
private void removeApp(FiCaSchedulerApp attempt) {
|
||||
LeafQueue queue = attempt.getCSLeafQueue();
|
||||
queue.finishApplicationAttempt(attempt, queue.getQueuePath());
|
||||
maxAppsEnforcer.untrackApp(attempt);
|
||||
maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveDoesNotEnableAnyApp() {
|
||||
ParentQueue root =
|
||||
(ParentQueue) queueManager.getRootQueue();
|
||||
LeafQueue leaf1 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue1.leaf1");
|
||||
LeafQueue leaf2 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue2.leaf2");
|
||||
root.setMaxParallelApps(2);
|
||||
leaf1.setMaxParallelApps(1);
|
||||
leaf2.setMaxParallelApps(1);
|
||||
|
||||
FiCaSchedulerApp app1 = addApp(leaf1, "user");
|
||||
addApp(leaf2, "user");
|
||||
addApp(leaf2, "user");
|
||||
assertEquals(1, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
|
||||
removeApp(app1);
|
||||
assertEquals(0, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveEnablesAppOnCousinQueue() {
|
||||
LeafQueue leaf1 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue1.leaf1");
|
||||
LeafQueue leaf2 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue2.leaf2");
|
||||
ParentQueue queue1 = (ParentQueue) queueManager
|
||||
.getQueueByFullName("root.queue1");
|
||||
queue1.setMaxParallelApps(2);
|
||||
|
||||
FiCaSchedulerApp app1 = addApp(leaf1, "user");
|
||||
addApp(leaf2, "user");
|
||||
addApp(leaf2, "user");
|
||||
assertEquals(1, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
|
||||
removeApp(app1);
|
||||
assertEquals(0, leaf1.getNumRunnableApps());
|
||||
assertEquals(2, leaf2.getNumRunnableApps());
|
||||
assertEquals(0, leaf2.getNumNonRunnableApps());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveEnablesOneByQueueOneByUser() {
|
||||
LeafQueue leaf1 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue1.leaf1");
|
||||
LeafQueue leaf2 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue2.leaf2");
|
||||
leaf1.setMaxParallelApps(2);
|
||||
//userMaxApps.put("user1", 1);
|
||||
csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1);
|
||||
|
||||
FiCaSchedulerApp app1 = addApp(leaf1, "user1");
|
||||
addApp(leaf1, "user2");
|
||||
addApp(leaf1, "user3");
|
||||
addApp(leaf2, "user1");
|
||||
assertEquals(2, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf1.getNumNonRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
|
||||
removeApp(app1);
|
||||
assertEquals(2, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(0, leaf1.getNumNonRunnableApps());
|
||||
assertEquals(0, leaf2.getNumNonRunnableApps());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveEnablingOrderedByStartTime() {
|
||||
LeafQueue leaf1 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue1.leaf1");
|
||||
LeafQueue leaf2 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue2.leaf2");
|
||||
ParentQueue queue1 = (ParentQueue) queueManager
|
||||
.getQueueByFullName("root.queue1");
|
||||
queue1.setMaxParallelApps(2);
|
||||
FiCaSchedulerApp app1 = addApp(leaf1, "user");
|
||||
addApp(leaf2, "user");
|
||||
addApp(leaf2, "user");
|
||||
clock.tickSec(20);
|
||||
addApp(leaf1, "user");
|
||||
assertEquals(1, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(1, leaf1.getNumNonRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
removeApp(app1);
|
||||
assertEquals(0, leaf1.getNumRunnableApps());
|
||||
assertEquals(2, leaf2.getNumRunnableApps());
|
||||
assertEquals(0, leaf2.getNumNonRunnableApps());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleAppsWaitingOnCousinQueue() {
|
||||
LeafQueue leaf1 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue1.leaf1");
|
||||
LeafQueue leaf2 = (LeafQueue) queueManager
|
||||
.getQueueByFullName("root.queue1.subqueue2.leaf2");
|
||||
ParentQueue queue1 = (ParentQueue) queueManager
|
||||
.getQueueByFullName("root.queue1");
|
||||
queue1.setMaxParallelApps(2);
|
||||
FiCaSchedulerApp app1 = addApp(leaf1, "user");
|
||||
addApp(leaf2, "user");
|
||||
addApp(leaf2, "user");
|
||||
addApp(leaf2, "user");
|
||||
assertEquals(1, leaf1.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumRunnableApps());
|
||||
assertEquals(2, leaf2.getNumNonRunnableApps());
|
||||
removeApp(app1);
|
||||
assertEquals(0, leaf1.getNumRunnableApps());
|
||||
assertEquals(2, leaf2.getNumRunnableApps());
|
||||
assertEquals(1, leaf2.getNumNonRunnableApps());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiListStartTimeIteratorEmptyAppLists() {
|
||||
List<List<FiCaSchedulerApp>> lists =
|
||||
new ArrayList<List<FiCaSchedulerApp>>();
|
||||
lists.add(Arrays.asList(mockAppAttempt(1)));
|
||||
lists.add(Arrays.asList(mockAppAttempt(2)));
|
||||
Iterator<FiCaSchedulerApp> iter =
|
||||
new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
|
||||
assertEquals(1, iter.next().getStartTime());
|
||||
assertEquals(2, iter.next().getStartTime());
|
||||
}
|
||||
|
||||
private FiCaSchedulerApp mockAppAttempt(long startTime) {
|
||||
FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class);
|
||||
when(schedApp.getStartTime()).thenReturn(startTime);
|
||||
return schedApp;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,312 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestCapacitySchedulerMaxParallelApps {
|
||||
private CapacitySchedulerConfiguration conf;
|
||||
private MockRM rm;
|
||||
private MockNM nm1;
|
||||
|
||||
private RMApp app1;
|
||||
private MockAM am1;
|
||||
private RMApp app2;
|
||||
private MockAM am2;
|
||||
private RMApp app3;
|
||||
private RMAppAttempt attempt3;
|
||||
private RMApp app4;
|
||||
private RMAppAttempt attempt4;
|
||||
|
||||
private ParentQueue rootQueue;
|
||||
private LeafQueue defaultQueue;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
CapacitySchedulerConfiguration config =
|
||||
new CapacitySchedulerConfiguration();
|
||||
config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class.getName());
|
||||
|
||||
conf = new CapacitySchedulerConfiguration(config);
|
||||
}
|
||||
|
||||
@After
|
||||
public void after() {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxParallelAppsExceedsQueueSetting() throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
|
||||
executeCommonStepsAndChecks();
|
||||
testWhenSettingsExceeded();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxParallelAppsExceedsDefaultQueueSetting()
|
||||
throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2);
|
||||
executeCommonStepsAndChecks();
|
||||
testWhenSettingsExceeded();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxParallelAppsExceedsUserSetting() throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2);
|
||||
executeCommonStepsAndChecks();
|
||||
testWhenSettingsExceeded();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2);
|
||||
executeCommonStepsAndChecks();
|
||||
testWhenSettingsExceeded();
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxParallelAppsWhenReloadingConfig() throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
|
||||
|
||||
executeCommonStepsAndChecks();
|
||||
|
||||
RMContext rmContext = rm.getRMContext();
|
||||
// Disable parallel apps setting + max out AM percent
|
||||
conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps");
|
||||
conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f);
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
cs.reinitialize(conf, rmContext);
|
||||
|
||||
// Both app #3 and app #4 should transition to RUNNABLE
|
||||
launchAMandWaitForRunning(app3, attempt3, nm1);
|
||||
launchAMandWaitForRunning(app4, attempt4, nm1);
|
||||
verifyRunningAndAcceptedApps(4, 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testMaxAppsReachedWithNonRunnableApps() throws Exception {
|
||||
conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2);
|
||||
conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4);
|
||||
executeCommonStepsAndChecks();
|
||||
|
||||
RMApp app5 = MockRMAppSubmitter.submit(rm,
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
|
||||
.withAppName("app5")
|
||||
.withUser("testuser")
|
||||
.withQueue("default")
|
||||
.withWaitForAppAcceptedState(false)
|
||||
.build());
|
||||
|
||||
rm.waitForState(app5.getApplicationId(), RMAppState.FAILED);
|
||||
}
|
||||
|
||||
private void executeCommonStepsAndChecks() throws Exception {
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
|
||||
nm1 = rm.registerNode("h1:1234", 4096, 8);
|
||||
rm.registerNode("h2:1234", 4096, 8);
|
||||
rm.registerNode("h3:1234", 4096, 8);
|
||||
|
||||
rm.drainEvents();
|
||||
|
||||
app1 = MockRMAppSubmitter.submit(rm,
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
|
||||
.withAppName("app1")
|
||||
.withUser("testuser")
|
||||
.withQueue("default")
|
||||
.build());
|
||||
|
||||
am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
app2 = MockRMAppSubmitter.submit(rm,
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
|
||||
.withAppName("app2")
|
||||
.withUser("testuser")
|
||||
.withQueue("default")
|
||||
.build());
|
||||
am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
|
||||
|
||||
app3 = MockRMAppSubmitter.submit(rm,
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
|
||||
.withAppName("app3")
|
||||
.withUser("testuser")
|
||||
.withQueue("default")
|
||||
.build());
|
||||
attempt3 = MockRM.waitForAttemptScheduled(app3, rm);
|
||||
|
||||
app4 = MockRMAppSubmitter.submit(rm,
|
||||
MockRMAppSubmissionData.Builder.createWithMemory(512, rm)
|
||||
.withAppName("app4")
|
||||
.withUser("testuser")
|
||||
.withQueue("default")
|
||||
.build());
|
||||
attempt4 = MockRM.waitForAttemptScheduled(app4, rm);
|
||||
|
||||
// Check that app attempt #3 and #4 are non-runnable
|
||||
rootQueue = getRootQueue();
|
||||
defaultQueue = getDefaultQueue();
|
||||
Set<ApplicationAttemptId> nonRunnables =
|
||||
Sets.newHashSet(
|
||||
attempt3.getAppAttemptId(),
|
||||
attempt4.getAppAttemptId());
|
||||
verifyRunnableAppsInParent(rootQueue, 2);
|
||||
verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables);
|
||||
verifyRunningAndAcceptedApps(2, 2);
|
||||
}
|
||||
|
||||
private void testWhenSettingsExceeded() throws Exception {
|
||||
// Stop app #1
|
||||
unregisterAMandWaitForFinish(app1, am1, nm1);
|
||||
|
||||
// Launch app #3
|
||||
launchAMandWaitForRunning(app3, attempt3, nm1);
|
||||
|
||||
// Check that attempt #4 is still non-runnable
|
||||
verifyRunnableAppsInParent(rootQueue, 2);
|
||||
verifyRunnableAppsInLeaf(defaultQueue, 2,
|
||||
Collections.singleton(attempt4.getAppAttemptId()));
|
||||
verifyRunningAndAcceptedApps(2, 1);
|
||||
|
||||
// Stop app #2
|
||||
unregisterAMandWaitForFinish(app2, am2, nm1);
|
||||
|
||||
// Launch app #4
|
||||
launchAMandWaitForRunning(app4, attempt4, nm1);
|
||||
verifyRunnableAppsInParent(rootQueue, 2);
|
||||
verifyRunnableAppsInLeaf(defaultQueue, 2,
|
||||
Collections.emptySet());
|
||||
verifyRunningAndAcceptedApps(2, 0);
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:hiddenfield")
|
||||
private LeafQueue getDefaultQueue() {
|
||||
CSQueue defaultQueue =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getQueue("default");
|
||||
|
||||
return (LeafQueue) defaultQueue;
|
||||
}
|
||||
|
||||
private ParentQueue getRootQueue() {
|
||||
CSQueue root =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getQueue("root");
|
||||
|
||||
return (ParentQueue) root;
|
||||
}
|
||||
|
||||
private void verifyRunnableAppsInParent(ParentQueue queue,
|
||||
int expectedRunnable) {
|
||||
assertEquals("Num of runnable apps", expectedRunnable,
|
||||
queue.getNumRunnableApps());
|
||||
}
|
||||
|
||||
private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable,
|
||||
Set<ApplicationAttemptId> nonRunnableIds) {
|
||||
assertEquals("Num of runnable apps", expectedRunnable,
|
||||
queue.getNumRunnableApps());
|
||||
|
||||
queue.getCopyOfNonRunnableAppSchedulables()
|
||||
.stream()
|
||||
.map(fca -> fca.getApplicationAttemptId())
|
||||
.forEach(id -> assertTrue(id + " not found as non-runnable",
|
||||
nonRunnableIds.contains(id)));
|
||||
}
|
||||
|
||||
private void verifyRunningAndAcceptedApps(int expectedRunning,
|
||||
int expectedAccepted) throws YarnException {
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
|
||||
|
||||
GetApplicationsResponse resp =
|
||||
rm.getClientRMService().getApplications(request);
|
||||
|
||||
List<ApplicationReport> apps = resp.getApplicationList();
|
||||
|
||||
long runningCount = apps
|
||||
.stream()
|
||||
.filter(report ->
|
||||
report.getYarnApplicationState() == YarnApplicationState.RUNNING)
|
||||
.count();
|
||||
|
||||
long acceptedCount = apps
|
||||
.stream()
|
||||
.filter(report ->
|
||||
report.getYarnApplicationState() == YarnApplicationState.ACCEPTED)
|
||||
.count();
|
||||
|
||||
assertEquals("Running apps count", expectedRunning, runningCount);
|
||||
assertEquals("Accepted apps count", expectedAccepted, acceptedCount);
|
||||
}
|
||||
|
||||
private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm)
|
||||
throws Exception {
|
||||
am.unregisterAppAttempt();
|
||||
nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
|
||||
RMAppAttemptState.FINISHED);
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt,
|
||||
MockNM nm) throws Exception {
|
||||
nm.nodeHeartbeat(true);
|
||||
((AbstractYarnScheduler)rm.getResourceScheduler()).update();
|
||||
rm.drainEvents();
|
||||
nm.nodeHeartbeat(true);
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||
|
||||
return am;
|
||||
}
|
||||
}
|
|
@ -456,6 +456,8 @@ public class TestLeafQueue {
|
|||
|
||||
@Test
|
||||
public void testAppAttemptMetrics() throws Exception {
|
||||
CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class);
|
||||
cs.setMaxRunningAppsEnforcer(enforcer);
|
||||
|
||||
// Manipulate queue 'a'
|
||||
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
|
||||
|
|
|
@ -218,6 +218,7 @@ public class TestQueueState {
|
|||
CommonNodeLabelsManager.NO_LABEL);
|
||||
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
|
||||
.thenCallRealMethod();
|
||||
when(application.isRunnable()).thenReturn(true);
|
||||
return application;
|
||||
}
|
||||
|
||||
|
|
|
@ -157,6 +157,7 @@ public class TestQueueStateManager {
|
|||
CommonNodeLabelsManager.NO_LABEL);
|
||||
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class)))
|
||||
.thenCallRealMethod();
|
||||
when(application.isRunnable()).thenReturn(true);
|
||||
return application;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue