YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf queues. (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1546626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-29 19:14:36 +00:00
parent db432fe76e
commit ce913ffd49
15 changed files with 777 additions and 139 deletions

View File

@ -108,6 +108,9 @@ Release 2.3.0 - UNRELEASED
YARN-1239. Modified ResourceManager state-store implementations to start
storing version numbers. (Jian He via vinodkv)
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
queues. (Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -51,7 +51,6 @@ public class AppSchedulable extends Schedulable {
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
private boolean runnable = false; // everyone starts as not runnable
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
@ -61,7 +60,7 @@ public class AppSchedulable extends Schedulable {
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = System.currentTimeMillis();
this.startTime = scheduler.getClock().getTime();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
@ -138,18 +137,6 @@ public class AppSchedulable extends Schedulable {
return p;
}
/**
* Is this application runnable? Runnable means that the user and queue
* application counts are within configured quotas.
*/
public boolean getRunnable() {
return runnable;
}
public void setRunnable(boolean runnable) {
this.runnable = runnable;
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
@ -281,9 +268,6 @@ public class AppSchedulable extends Schedulable {
unreserve(priority, node);
return Resources.none();
}
} else {
// If this app is over quota, don't schedule anything
if (!(getRunnable())) { return Resources.none(); }
}
Collection<Priority> prioritiesToTry = (reserved) ?

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
@ -42,7 +41,9 @@ public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
private final List<AppSchedulable> appScheds =
private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
new ArrayList<AppSchedulable>();
private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>();
private final FairScheduler scheduler;
@ -62,29 +63,51 @@ public class FSLeafQueue extends FSQueue {
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
public void addApp(FSSchedulerApp app) {
public void addApp(FSSchedulerApp app, boolean runnable) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
appScheds.add(appSchedulable);
if (runnable) {
runnableAppScheds.add(appSchedulable);
} else {
nonRunnableAppScheds.add(appSchedulable);
}
}
// for testing
void addAppSchedulable(AppSchedulable appSched) {
appScheds.add(appSched);
runnableAppScheds.add(appSched);
}
public void removeApp(FSSchedulerApp app) {
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
AppSchedulable appSched = it.next();
if (appSched.getApp() == app) {
it.remove();
break;
}
/**
* Removes the given app from this queue.
* @return whether or not the app was runnable
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
} else {
throw new IllegalStateException("Given app to remove " + app +
" does not exist in queue " + this);
}
}
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
public void makeAppRunnable(AppSchedulable appSched) {
if (!nonRunnableAppScheds.remove(appSched)) {
throw new IllegalStateException("Can't make app runnable that does not " +
"already exist in queue as non-runnable" + appSched);
}
runnableAppScheds.add(appSched);
}
public Collection<AppSchedulable> getRunnableAppSchedulables() {
return runnableAppScheds;
}
public List<AppSchedulable> getNonRunnableAppSchedulables() {
return nonRunnableAppScheds;
}
@Override
@ -98,7 +121,7 @@ public class FSLeafQueue extends FSQueue {
@Override
public void recomputeShares() {
policy.computeShares(getAppSchedulables(), getFairShare());
policy.computeShares(getRunnableAppSchedulables(), getFairShare());
}
@Override
@ -109,7 +132,10 @@ public class FSLeafQueue extends FSQueue {
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
for (AppSchedulable app : runnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
for (AppSchedulable app : nonRunnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
@ -121,7 +147,25 @@ public class FSLeafQueue extends FSQueue {
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
for (AppSchedulable sched : appScheds) {
for (AppSchedulable sched : runnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
}
for (AppSchedulable sched : nonRunnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes);
}
}
private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
@ -131,14 +175,6 @@ public class FSLeafQueue extends FSQueue {
}
demand = Resources.add(demand, toAdd);
demand = Resources.componentwiseMin(demand, maxRes);
if (Resources.equals(demand, maxRes)) {
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes);
}
}
@Override
@ -153,9 +189,8 @@ public class FSLeafQueue extends FSQueue {
}
Comparator<Schedulable> comparator = policy.getComparator();
Collections.sort(appScheds, comparator);
for (AppSchedulable sched : appScheds) {
if (sched.getRunnable()) {
Collections.sort(runnableAppScheds, comparator);
for (AppSchedulable sched : runnableAppScheds) {
if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
continue;
}
@ -165,7 +200,6 @@ public class FSLeafQueue extends FSQueue {
break;
}
}
}
return assigned;
}
@ -205,4 +239,9 @@ public class FSLeafQueue extends FSQueue {
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
@Override
public int getNumRunnableApps() {
return runnableAppScheds.size();
}
}

View File

@ -43,6 +43,7 @@ public class FSParentQueue extends FSQueue {
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
private int runnableApps;
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
@ -171,4 +172,17 @@ public class FSParentQueue extends FSQueue {
}
super.policy = policy;
}
public void incrementRunnableApps() {
runnableApps++;
}
public void decrementRunnableApps() {
runnableApps--;
}
@Override
public int getNumRunnableApps() {
return runnableApps;
}
}

View File

@ -73,6 +73,10 @@ public abstract class FSQueue extends Schedulable implements Queue {
return policy;
}
public FSParentQueue getParent() {
return parent;
}
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
throws AllocationConfigurationException {
throw new AllocationConfigurationException("SchedulingPolicy " + policy
@ -164,6 +168,12 @@ public abstract class FSQueue extends Schedulable implements Queue {
*/
public abstract Collection<FSQueue> getChildQueues();
/**
* Return the number of apps for which containers can be allocated.
* Includes apps in subqueues.
*/
public abstract int getNumRunnableApps();
/**
* Helper method to check if the queue should attempt assigning resources
*

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -62,7 +61,7 @@ public class FSSchedulerApp extends SchedulerApplication {
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
@ -327,4 +326,9 @@ public class FSSchedulerApp extends SchedulerApplication {
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
}

View File

@ -190,9 +190,13 @@ public class FairScheduler implements ResourceScheduler {
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
@VisibleForTesting
final MaxRunningAppsEnforcer maxRunningEnforcer;
public FairScheduler() {
clock = new SystemClock();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
}
private void validateConf(Configuration conf) {
@ -272,7 +276,6 @@ public class FairScheduler implements ResourceScheduler {
*/
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@ -377,7 +380,7 @@ public class FairScheduler implements ResourceScheduler {
for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as : sched.getAppSchedulables()) {
for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@ -505,51 +508,12 @@ public class FairScheduler implements ResourceScheduler {
return resToPreempt;
}
/**
* This updates the runnability of all apps based on whether or not any
* users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
}
// Create a list of sorted jobs in order of start time and priority
Collections.sort(apps, new FifoAppComparator());
// Mark jobs as runnable in order of start time and priority, until
// user or queue limits have been reached.
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
int queueCount = queueApps.containsKey(queue) ? queueApps.get(queue) : 0;
if (userCount < queueMgr.getUserMaxApps(user) &&
queueCount < queueMgr.getQueueMaxApps(queue)) {
userApps.put(user, userCount + 1);
queueApps.put(queue, queueCount + 1);
app.setRunnable(true);
}
}
}
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
// synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return ResourceWeights.NEUTRAL;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
@ -562,7 +526,6 @@ public class FairScheduler implements ResourceScheduler {
}
return new ResourceWeights((float)weight);
}
}
@Override
public Resource getMinimumResourceCapability() {
@ -662,7 +625,14 @@ public class FairScheduler implements ResourceScheduler {
return;
}
queue.addApp(schedulerApp);
boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
queue.addApp(schedulerApp, runnable);
if (runnable) {
maxRunningEnforcer.trackRunnableApp(schedulerApp);
} else {
maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
@ -736,7 +706,13 @@ public class FairScheduler implements ResourceScheduler {
// Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName(), false);
queue.removeApp(application);
boolean wasRunnable = queue.removeApp(application);
if (wasRunnable) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
} else {
maxRunningEnforcer.untrackNonRunnableApp(application);
}
// Remove from our data-structure
applications.remove(applicationAttemptId);

View File

@ -0,0 +1,302 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
/**
* Handles tracking and enforcement for user and queue maxRunningApps
* constraints
*/
public class MaxRunningAppsEnforcer {
private final QueueManager queueMgr;
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@VisibleForTesting
final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
public MaxRunningAppsEnforcer(QueueManager queueMgr) {
this.queueMgr = queueMgr;
this.usersNumRunnableApps = new HashMap<String, Integer>();
this.usersNonRunnableApps = ArrayListMultimap.create();
}
/**
* Checks whether making the application runnable would exceed any
* maxRunningApps limits.
*/
public boolean canAppBeRunnable(FSQueue queue, String user) {
Integer userNumRunnable = usersNumRunnableApps.get(user);
if (userNumRunnable == null) {
userNumRunnable = 0;
}
if (userNumRunnable >= queueMgr.getUserMaxApps(user)) {
return false;
}
// Check queue and all parent queues
while (queue != null) {
int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName());
if (queue.getNumRunnableApps() >= queueMaxApps) {
return false;
}
queue = queue.getParent();
}
return true;
}
/**
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
*/
public void trackRunnableApp(FSSchedulerApp app) {
String user = app.getUser();
FSLeafQueue queue = app.getQueue();
// Increment running counts for all parent queues
FSParentQueue parent = queue.getParent();
while (parent != null) {
parent.incrementRunnableApps();
parent = parent.getParent();
}
Integer userNumRunnable = usersNumRunnableApps.get(user);
usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
: userNumRunnable) + 1);
}
/**
* Tracks the given new non runnable app so that it can be made runnable when
* it would not violate max running app limits.
*/
public void trackNonRunnableApp(FSSchedulerApp app) {
String user = app.getUser();
usersNonRunnableApps.put(user, app.getAppSchedulable());
}
/**
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed. Checks to see whether any other applications
* are now runnable and makes them so.
*
* Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack.
*/
public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
if (newUserNumRunning == 0) {
usersNumRunnableApps.remove(user);
} else {
usersNumRunnableApps.put(user, newUserNumRunning);
}
// Update runnable app bookkeeping for queues:
// childqueueX might have no pending apps itself, but if a queue higher up
// in the hierarchy parentqueueY has a maxRunningApps set, an app completion
// in childqueueX could allow an app in some other distant child of
// parentqueueY to become runnable.
// An app removal will only possibly allow another app to become runnable if
// the queue was already at its max before the removal.
// Thus we find the ancestor queue highest in the tree for which the app
// that was at its maxRunningApps before the removal.
FSLeafQueue queue = app.getQueue();
FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
FSParentQueue parent = queue.getParent();
while (parent != null) {
if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent
.getName())) {
highestQueueWithAppsNowRunnable = parent;
}
parent.decrementRunnableApps();
parent = parent.getParent();
}
List<List<AppSchedulable>> appsNowMaybeRunnable =
new ArrayList<List<AppSchedulable>>();
// Compile lists of apps which may now be runnable
// We gather lists instead of building a set of all non-runnable apps so
// that this whole operation can be O(number of queues) instead of
// O(number of apps)
if (highestQueueWithAppsNowRunnable != null) {
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
appsNowMaybeRunnable);
}
if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) {
List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
}
}
// Scan through and check whether this means that any apps are now runnable
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
FSSchedulerApp prev = null;
int numNowRunnable = 0;
while (iter.hasNext()) {
FSSchedulerApp next = iter.next();
if (next == prev) {
continue;
}
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
trackRunnableApp(next);
AppSchedulable appSched = next.getAppSchedulable();
next.getQueue().makeAppRunnable(appSched);
if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
throw new IllegalStateException("Waiting app " + next
+ " expected to be in usersNonRunnableApps");
}
// No more than one app per list will be able to be made runnable, so
// we can stop looking after we've found that many
if (numNowRunnable >= appsNowMaybeRunnable.size()) {
break;
}
}
prev = next;
}
}
/**
* Stops tracking the given non-runnable app
*/
public void untrackNonRunnableApp(FSSchedulerApp app) {
usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
}
/**
* Traverses the queue hierarchy under the given queue to gather all lists
* of non-runnable applications.
*/
private void gatherPossiblyRunnableAppLists(FSQueue queue,
List<List<AppSchedulable>> appLists) {
if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) {
if (queue instanceof FSLeafQueue) {
appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables());
} else {
for (FSQueue child : queue.getChildQueues()) {
gatherPossiblyRunnableAppLists(child, appLists);
}
}
}
}
/**
* Takes a list of lists, each of which is ordered by start time, and returns
* their elements in order of start time.
*
* We maintain positions in each of the lists. Each next() call advances
* the position in one of the lists. We maintain a heap that orders lists
* by the start time of the app in the current position in that list.
* This allows us to pick which list to advance in O(log(num lists)) instead
* of O(num lists) time.
*/
private static class MultiListStartTimeIterator implements
Iterator<FSSchedulerApp> {
private List<AppSchedulable>[] appLists;
private int[] curPositionsInAppLists;
private PriorityQueue<IndexAndTime> appListsByCurStartTime;
@SuppressWarnings("unchecked")
public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
appLists = appListList.toArray(new List[appListList.size()]);
curPositionsInAppLists = new int[appLists.length];
appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
for (int i = 0; i < appLists.length; i++) {
long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
.getStartTime();
appListsByCurStartTime.add(new IndexAndTime(i, time));
}
}
@Override
public boolean hasNext() {
return !appListsByCurStartTime.isEmpty()
&& appListsByCurStartTime.peek().time != Long.MAX_VALUE;
}
@Override
public FSSchedulerApp next() {
IndexAndTime indexAndTime = appListsByCurStartTime.remove();
int nextListIndex = indexAndTime.index;
AppSchedulable next = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]);
curPositionsInAppLists[nextListIndex]++;
if (curPositionsInAppLists[nextListIndex] < appLists[nextListIndex].size()) {
indexAndTime.time = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]).getStartTime();
} else {
indexAndTime.time = Long.MAX_VALUE;
}
appListsByCurStartTime.add(indexAndTime);
return next.getApp();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
private static class IndexAndTime implements Comparable<IndexAndTime> {
public int index;
public long time;
public IndexAndTime(int index, long time) {
this.index = index;
this.time = time;
}
@Override
public int compareTo(IndexAndTime o) {
return time < o.time ? -1 : (time > o.time ? 1 : 0);
}
@Override
public boolean equals(Object o) {
if (!(o instanceof IndexAndTime)) {
return false;
}
IndexAndTime other = (IndexAndTime)o;
return other.time == time;
}
@Override
public int hashCode() {
return (int)time;
}
}
}
}

View File

@ -89,7 +89,8 @@ public class QueueManager {
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting
volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting
volatile QueuePlacementPolicy placementPolicy;

View File

@ -39,7 +39,7 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo {
public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
super(queue, scheduler);
Collection<AppSchedulable> apps = queue.getAppSchedulables();
Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables();
for (AppSchedulable app : apps) {
if (app.getApp().isPending()) {
numPendingApps++;
@ -47,6 +47,7 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo {
numActiveApps++;
}
}
numPendingApps += queue.getNonRunnableAppSchedulables().size();
}
public int getNumActiveApplications() {

View File

@ -596,23 +596,24 @@ public class TestCapacityScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications(
cs.applications, FiCaSchedulerApp.class);
cs.applications, FiCaSchedulerApp.class, Queue.class);
}
public static <T extends SchedulerApplication>
public static <T extends SchedulerApplication, Q extends Queue>
void verifyConcurrentAccessOnApplications(
final Map<ApplicationAttemptId, T> applications, Class<T> clazz)
final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
final Class<Q> queueClazz)
throws Exception {
final int size = 10000;
final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Constructor<T> ctor = clazz.getDeclaredConstructor(
ApplicationAttemptId.class, String.class, Queue.class,
final Constructor<T> ctor = appClazz.getDeclaredConstructor(
ApplicationAttemptId.class, String.class, queueClazz,
ActiveUsersManager.class, RMContext.class);
ApplicationAttemptId appAttemptId0
= ApplicationAttemptId.newInstance(appId, 0);
applications.put(appAttemptId0, ctor.newInstance(
appAttemptId0, null, mock(Queue.class), null, null));
appAttemptId0, null, mock(queueClazz), null, null));
assertNotNull(applications.get(appAttemptId0));
// Imitating the thread of scheduler that will add and remove apps
@ -627,7 +628,7 @@ public class TestCapacityScheduler {
= ApplicationAttemptId.newInstance(appId, i);
try {
applications.put(appAttemptId, ctor.newInstance(
appAttemptId, null, mock(Queue.class), null, null));
appAttemptId, null, mock(queueClazz), null, null));
} catch (Exception e) {
failed.set(true);
finished.set(true);

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
@ -53,7 +52,7 @@ public class TestFSSchedulerApp {
@Test
public void testDelayScheduling() {
Queue queue = Mockito.mock(Queue.class);
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
double nodeLocalityThreshold = .5;
@ -110,7 +109,7 @@ public class TestFSSchedulerApp {
@Test
public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException {
Queue queue = Mockito.mock(Queue.class);
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
@ -170,7 +169,7 @@ public class TestFSSchedulerApp {
* no tin use), the least restrictive locality level is returned.
*/
public void testLocalityLevelWithoutDelays() {
Queue queue = Mockito.mock(Queue.class);
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);

View File

@ -100,7 +100,7 @@ import com.google.common.collect.Sets;
public class TestFairScheduler {
private class MockClock implements Clock {
static class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
@ -613,9 +613,9 @@ public class TestFairScheduler {
appAttemptId, "default", "user1");
scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
assertEquals("root.user1", rmApp.getQueue());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
@ -625,11 +625,11 @@ public class TestFairScheduler {
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
}
@Test
@ -821,7 +821,7 @@ public class TestFairScheduler {
// That queue should have one app
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
@ -831,7 +831,7 @@ public class TestFairScheduler {
// Queue should have no apps
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size());
.getRunnableAppSchedulables().size());
}
@Test
@ -2400,7 +2400,158 @@ public class TestFairScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
FairScheduler fs = new FairScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FSSchedulerApp.class);
fs.applications, FSSchedulerApp.class, FSLeafQueue.class);
}
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
FSSchedulerApp app = scheduler.applications.get(attId);
FSLeafQueue queue = app.getQueue();
Collection<AppSchedulable> runnableApps =
queue.getRunnableAppSchedulables();
Collection<AppSchedulable> nonRunnableApps =
queue.getNonRunnableAppSchedulables();
assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
}
private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
int numNonRunnableInQueue) {
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(queueName, false);
assertEquals(numRunnableInQueue,
queue.getRunnableAppSchedulables().size());
assertEquals(numNonRunnableInQueue,
queue.getNonRunnableAppSchedulables().size());
}
@Test
public void testUserAndQueueMaxRunningApps() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println("<maxRunningApps>2</maxRunningApps>");
out.println("</queue>");
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>1</maxRunningApps>");
out.println("</user>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
verifyAppRunnable(attId1, true);
verifyQueueNumRunnable("queue1", 1, 0);
// exceeds user limit
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue2", "user1");
verifyAppRunnable(attId2, false);
verifyQueueNumRunnable("queue2", 0, 1);
// exceeds no limits
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId3, true);
verifyQueueNumRunnable("queue1", 2, 0);
// exceeds queue limit
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId4, false);
verifyQueueNumRunnable("queue1", 2, 1);
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
attId1, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue2", 1, 0);
verifyAppRunnable(attId4, true);
verifyQueueNumRunnable("queue1", 2, 0);
// A new app to queue1 should not be runnable
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1", 2, 1);
}
@Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println(" <queue name=\"sub1\"></queue>");
out.println(" <queue name=\"sub2\"></queue>");
out.println(" <queue name=\"sub3\">");
out.println(" <maxRunningApps>1</maxRunningApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true);
verifyQueueNumRunnable("queue1.sub1", 1, 0);
clock.tick(10);
// exceeds no limits
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue1.sub3", 1, 0);
clock.tick(10);
// exceeds no limits
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId3, true);
verifyQueueNumRunnable("queue1.sub2", 1, 0);
clock.tick(10);
// exceeds queue1 limit
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId4, false);
verifyQueueNumRunnable("queue1.sub2", 1, 1);
clock.tick(10);
// exceeds sub3 limit
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1.sub3", 1, 1);
clock.tick(10);
// Even though the app was removed from sub3, the app from sub2 gets to go
// because it came in first
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
attId2, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId4, true);
verifyQueueNumRunnable("queue1.sub2", 2, 0);
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1.sub3", 0, 1);
// Now test removal of a non-runnable app
AppRemovedSchedulerEvent appRemovedEvent2 = new AppRemovedSchedulerEvent(
attId5, RMAppAttemptState.KILLED);
scheduler.handle(appRemovedEvent2);
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
.get("user1").size());
// verify app gone in queue accounting
verifyQueueNumRunnable("queue1.sub3", 0, 0);
// verify it doesn't become runnable when there would be space for it
AppRemovedSchedulerEvent appRemovedEvent3 = new AppRemovedSchedulerEvent(
attId4, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent3);
verifyQueueNumRunnable("queue1.sub2", 1, 0);
verifyQueueNumRunnable("queue1.sub3", 0, 0);
}
@Test (timeout = 10000)
@ -2499,23 +2650,23 @@ public class TestFairScheduler {
// Should get put into jerry
createSchedulingRequest(1024, "jerry", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size());
assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
// Should get forced into default
createSchedulingRequest(1024, "newqueue", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size());
assertEquals(1, defaultQueue.getAppSchedulables().size());
assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(1, defaultQueue.getRunnableAppSchedulables().size());
// Would get put into someuser because of user-as-default-queue, but should
// be forced into default
createSchedulingRequest(1024, "default", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size());
assertEquals(2, defaultQueue.getAppSchedulables().size());
assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
// Should get put into jerry because of user-as-default-queue
createSchedulingRequest(1024, "default", "jerry");
assertEquals(2, jerryQueue.getAppSchedulables().size());
assertEquals(2, defaultQueue.getAppSchedulables().size());
assertEquals(2, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
}
@SuppressWarnings("resource")

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Before;
import org.junit.Test;
public class TestMaxRunningAppsEnforcer {
private QueueManager queueManager;
private Map<String, Integer> queueMaxApps;
private Map<String, Integer> userMaxApps;
private MaxRunningAppsEnforcer maxAppsEnforcer;
private int appNum;
private TestFairScheduler.MockClock clock;
@Before
public void setup() throws Exception {
clock = new TestFairScheduler.MockClock();
FairScheduler scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn(
new FairSchedulerConfiguration(new Configuration()));
when(scheduler.getClock()).thenReturn(clock);
queueManager = new QueueManager(scheduler);
queueManager.initialize();
queueMaxApps = queueManager.info.queueMaxApps;
userMaxApps = queueManager.info.userMaxApps;
maxAppsEnforcer = new MaxRunningAppsEnforcer(queueManager);
appNum = 0;
}
private FSSchedulerApp addApp(FSLeafQueue queue, String user) {
ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user);
FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null);
queue.addApp(app, runnable);
if (runnable) {
maxAppsEnforcer.trackRunnableApp(app);
} else {
maxAppsEnforcer.trackNonRunnableApp(app);
}
return app;
}
private void removeApp(FSSchedulerApp app) {
app.getQueue().removeApp(app);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(app);
}
@Test
public void testRemoveDoesNotEnableAnyApp() {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true);
queueMaxApps.put("root", 2);
queueMaxApps.put("root.queue1", 1);
queueMaxApps.put("root.queue2", 1);
FSSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
removeApp(app1);
assertEquals(0, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
}
@Test
public void testRemoveEnablesAppOnCousinQueue() {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
removeApp(app1);
assertEquals(0, leaf1.getRunnableAppSchedulables().size());
assertEquals(2, leaf2.getRunnableAppSchedulables().size());
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
}
@Test
public void testRemoveEnablesOneByQueueOneByUser() {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true);
queueMaxApps.put("root.queue1.leaf1", 2);
userMaxApps.put("user1", 1);
FSSchedulerApp app1 = addApp(leaf1, "user1");
addApp(leaf1, "user2");
addApp(leaf1, "user3");
addApp(leaf2, "user1");
assertEquals(2, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
removeApp(app1);
assertEquals(2, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(0, leaf1.getNonRunnableAppSchedulables().size());
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
}
@Test
public void testRemoveEnablingOrderedByStartTime() {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
clock.tick(20);
addApp(leaf1, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
assertEquals(1, leaf2.getRunnableAppSchedulables().size());
assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
removeApp(app1);
assertEquals(0, leaf1.getRunnableAppSchedulables().size());
assertEquals(2, leaf2.getRunnableAppSchedulables().size());
assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@ -518,7 +519,7 @@ public class TestFifoScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
FifoScheduler fs = new FifoScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FiCaSchedulerApp.class);
fs.applications, FiCaSchedulerApp.class, Queue.class);
}
@SuppressWarnings("resource")