YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf queues. (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546623 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-29 19:02:41 +00:00
parent 9da451cac5
commit 7545d8bf99
13 changed files with 323 additions and 139 deletions

View File

@ -126,6 +126,9 @@ Release 2.3.0 - UNRELEASED
YARN-1239. Modified ResourceManager state-store implementations to start YARN-1239. Modified ResourceManager state-store implementations to start
storing version numbers. (Jian He via vinodkv) storing version numbers. (Jian He via vinodkv)
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
queues. (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -51,7 +51,6 @@ public class AppSchedulable extends Schedulable {
private FairScheduler scheduler; private FairScheduler scheduler;
private FSSchedulerApp app; private FSSchedulerApp app;
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
private boolean runnable = false; // everyone starts as not runnable
private long startTime; private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class); private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
@ -61,7 +60,7 @@ public class AppSchedulable extends Schedulable {
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler; this.scheduler = scheduler;
this.app = app; this.app = app;
this.startTime = System.currentTimeMillis(); this.startTime = scheduler.getClock().getTime();
this.queue = queue; this.queue = queue;
this.containerTokenSecretManager = scheduler. this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager(); getContainerTokenSecretManager();
@ -138,18 +137,6 @@ public class AppSchedulable extends Schedulable {
return p; return p;
} }
/**
* Is this application runnable? Runnable means that the user and queue
* application counts are within configured quotas.
*/
public boolean getRunnable() {
return runnable;
}
public void setRunnable(boolean runnable) {
this.runnable = runnable;
}
/** /**
* Create and return a container object reflecting an allocation for the * Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and * given appliction on the given node with the given capability and
@ -281,9 +268,6 @@ public class AppSchedulable extends Schedulable {
unreserve(priority, node); unreserve(priority, node);
return Resources.none(); return Resources.none();
} }
} else {
// If this app is over quota, don't schedule anything
if (!(getRunnable())) { return Resources.none(); }
} }
Collection<Priority> prioritiesToTry = (reserved) ? Collection<Priority> prioritiesToTry = (reserved) ?

View File

@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,7 +41,9 @@ public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName()); FSLeafQueue.class.getName());
private final List<AppSchedulable> appScheds = private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
new ArrayList<AppSchedulable>();
private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>(); new ArrayList<AppSchedulable>();
private final FairScheduler scheduler; private final FairScheduler scheduler;
@ -62,29 +63,51 @@ public class FSLeafQueue extends FSQueue {
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
} }
public void addApp(FSSchedulerApp app) { public void addApp(FSSchedulerApp app, boolean runnable) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable); app.setAppSchedulable(appSchedulable);
appScheds.add(appSchedulable); if (runnable) {
runnableAppScheds.add(appSchedulable);
} else {
nonRunnableAppScheds.add(appSchedulable);
}
} }
// for testing // for testing
void addAppSchedulable(AppSchedulable appSched) { void addAppSchedulable(AppSchedulable appSched) {
appScheds.add(appSched); runnableAppScheds.add(appSched);
} }
public void removeApp(FSSchedulerApp app) { /**
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) { * Removes the given app from this queue.
AppSchedulable appSched = it.next(); * @return whether or not the app was runnable
if (appSched.getApp() == app) { */
it.remove(); public boolean removeApp(FSSchedulerApp app) {
break; if (runnableAppScheds.remove(app.getAppSchedulable())) {
} return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
return false;
} else {
throw new IllegalStateException("Given app to remove " + app +
" does not exist in queue " + this);
} }
} }
public Collection<AppSchedulable> getAppSchedulables() { public void makeAppRunnable(AppSchedulable appSched) {
return appScheds; if (!nonRunnableAppScheds.remove(appSched)) {
throw new IllegalStateException("Can't make app runnable that does not " +
"already exist in queue as non-runnable" + appSched);
}
runnableAppScheds.add(appSched);
}
public Collection<AppSchedulable> getRunnableAppSchedulables() {
return runnableAppScheds;
}
public List<AppSchedulable> getNonRunnableAppSchedulables() {
return nonRunnableAppScheds;
} }
@Override @Override
@ -98,7 +121,7 @@ public class FSLeafQueue extends FSQueue {
@Override @Override
public void recomputeShares() { public void recomputeShares() {
policy.computeShares(getAppSchedulables(), getFairShare()); policy.computeShares(getRunnableAppSchedulables(), getFairShare());
} }
@Override @Override
@ -109,7 +132,10 @@ public class FSLeafQueue extends FSQueue {
@Override @Override
public Resource getResourceUsage() { public Resource getResourceUsage() {
Resource usage = Resources.createResource(0); Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) { for (AppSchedulable app : runnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
for (AppSchedulable app : nonRunnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage()); Resources.addTo(usage, app.getResourceUsage());
} }
return usage; return usage;
@ -121,19 +147,17 @@ public class FSLeafQueue extends FSQueue {
// Limit demand to maxResources // Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName()); Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0); demand = Resources.createResource(0);
for (AppSchedulable sched : appScheds) { for (AppSchedulable sched : runnableAppScheds) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ "; Total resource consumption for " + getName() + " now "
+ demand);
}
demand = Resources.add(demand, toAdd);
demand = Resources.componentwiseMin(demand, maxRes);
if (Resources.equals(demand, maxRes)) { if (Resources.equals(demand, maxRes)) {
break; break;
} }
updateDemandForApp(sched, maxRes);
}
for (AppSchedulable sched : nonRunnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand LOG.debug("The updated demand for " + getName() + " is " + demand
@ -141,6 +165,18 @@ public class FSLeafQueue extends FSQueue {
} }
} }
private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ "; Total resource consumption for " + getName() + " now "
+ demand);
}
demand = Resources.add(demand, toAdd);
demand = Resources.componentwiseMin(demand, maxRes);
}
@Override @Override
public Resource assignContainer(FSSchedulerNode node) { public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none(); Resource assigned = Resources.none();
@ -153,17 +189,15 @@ public class FSLeafQueue extends FSQueue {
} }
Comparator<Schedulable> comparator = policy.getComparator(); Comparator<Schedulable> comparator = policy.getComparator();
Collections.sort(appScheds, comparator); Collections.sort(runnableAppScheds, comparator);
for (AppSchedulable sched : appScheds) { for (AppSchedulable sched : runnableAppScheds) {
if (sched.getRunnable()) { if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) { continue;
continue; }
}
assigned = sched.assignContainer(node); assigned = sched.assignContainer(node);
if (!assigned.equals(Resources.none())) { if (!assigned.equals(Resources.none())) {
break; break;
}
} }
} }
return assigned; return assigned;
@ -205,4 +239,9 @@ public class FSLeafQueue extends FSQueue {
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
} }
@Override
public int getNumRunnableApps() {
return runnableAppScheds.size();
}
} }

View File

@ -43,6 +43,7 @@ public class FSParentQueue extends FSQueue {
new ArrayList<FSQueue>(); new ArrayList<FSQueue>();
private final QueueManager queueMgr; private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
private int runnableApps;
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler, public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) { FSParentQueue parent) {
@ -171,4 +172,17 @@ public class FSParentQueue extends FSQueue {
} }
super.policy = policy; super.policy = policy;
} }
public void incrementRunnableApps() {
runnableApps++;
}
public void decrementRunnableApps() {
runnableApps--;
}
@Override
public int getNumRunnableApps() {
return runnableApps;
}
} }

View File

@ -73,6 +73,10 @@ public abstract class FSQueue extends Schedulable implements Queue {
return policy; return policy;
} }
public FSParentQueue getParent() {
return parent;
}
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
throws AllocationConfigurationException { throws AllocationConfigurationException {
throw new AllocationConfigurationException("SchedulingPolicy " + policy throw new AllocationConfigurationException("SchedulingPolicy " + policy
@ -164,6 +168,12 @@ public abstract class FSQueue extends Schedulable implements Queue {
*/ */
public abstract Collection<FSQueue> getChildQueues(); public abstract Collection<FSQueue> getChildQueues();
/**
* Return the number of apps for which containers can be allocated.
* Includes apps in subqueues.
*/
public abstract int getNumRunnableApps();
/** /**
* Helper method to check if the queue should attempt assigning resources * Helper method to check if the queue should attempt assigning resources
* *

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -62,7 +61,7 @@ public class FSSchedulerApp extends SchedulerApplication {
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>(); final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) { RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext); super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
} }
@ -327,4 +326,9 @@ public class FSSchedulerApp extends SchedulerApplication {
public Set<RMContainer> getPreemptionContainers() { public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet(); return preemptionMap.keySet();
} }
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
} }

View File

@ -190,9 +190,13 @@ public class FairScheduler implements ResourceScheduler {
// heartbeat // heartbeat
protected int maxAssign; // Max containers to assign per heartbeat protected int maxAssign; // Max containers to assign per heartbeat
@VisibleForTesting
final MaxRunningAppsEnforcer maxRunningEnforcer;
public FairScheduler() { public FairScheduler() {
clock = new SystemClock(); clock = new SystemClock();
queueMgr = new QueueManager(this); queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
} }
private void validateConf(Configuration conf) { private void validateConf(Configuration conf) {
@ -272,7 +276,6 @@ public class FairScheduler implements ResourceScheduler {
*/ */
protected synchronized void update() { protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue(); FSQueue rootQueue = queueMgr.getRootQueue();
@ -377,7 +380,7 @@ public class FairScheduler implements ResourceScheduler {
for (FSLeafQueue sched : scheds) { for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity, if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), sched.getFairShare())) { sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as : sched.getAppSchedulables()) { for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) { for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c); runningContainers.add(c);
apps.put(c, as.getApp()); apps.put(c, as.getApp());
@ -505,63 +508,23 @@ public class FairScheduler implements ResourceScheduler {
return resToPreempt; return resToPreempt;
} }
/**
* This updates the runnability of all apps based on whether or not any
* users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
}
// Create a list of sorted jobs in order of start time and priority
Collections.sort(apps, new FifoAppComparator());
// Mark jobs as runnable in order of start time and priority, until
// user or queue limits have been reached.
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
int queueCount = queueApps.containsKey(queue) ? queueApps.get(queue) : 0;
if (userCount < queueMgr.getUserMaxApps(user) &&
queueCount < queueMgr.getQueueMaxApps(queue)) {
userApps.put(user, userCount + 1);
queueApps.put(queue, queueCount + 1);
app.setRunnable(true);
}
}
}
public RMContainerTokenSecretManager getContainerTokenSecretManager() { public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager(); return rmContext.getContainerTokenSecretManager();
} }
// synchronized for sizeBasedWeight // synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(AppSchedulable app) { public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) { double weight = 1.0;
// Job won't launch tasks, but don't return 0 to avoid division errors if (sizeBasedWeight) {
return ResourceWeights.NEUTRAL; // Set weight based on current memory demand
} else { weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
if (weightAdjuster != null) {
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
return new ResourceWeights((float)weight);
} }
weight *= app.getPriority().getPriority();
if (weightAdjuster != null) {
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
return new ResourceWeights((float)weight);
} }
@Override @Override
@ -662,7 +625,14 @@ public class FairScheduler implements ResourceScheduler {
return; return;
} }
queue.addApp(schedulerApp); boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
queue.addApp(schedulerApp, runnable);
if (runnable) {
maxRunningEnforcer.trackRunnableApp(schedulerApp);
} else {
maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
}
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp); applications.put(applicationAttemptId, schedulerApp);
@ -736,7 +706,13 @@ public class FairScheduler implements ResourceScheduler {
// Inform the queue // Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue() FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName(), false); .getQueueName(), false);
queue.removeApp(application); boolean wasRunnable = queue.removeApp(application);
if (wasRunnable) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
} else {
maxRunningEnforcer.untrackNonRunnableApp(application);
}
// Remove from our data-structure // Remove from our data-structure
applications.remove(applicationAttemptId); applications.remove(applicationAttemptId);

View File

@ -89,7 +89,8 @@ public class QueueManager {
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>(); private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue; private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo(); @VisibleForTesting
volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting @VisibleForTesting
volatile QueuePlacementPolicy placementPolicy; volatile QueuePlacementPolicy placementPolicy;

View File

@ -39,7 +39,7 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo {
public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
super(queue, scheduler); super(queue, scheduler);
Collection<AppSchedulable> apps = queue.getAppSchedulables(); Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables();
for (AppSchedulable app : apps) { for (AppSchedulable app : apps) {
if (app.getApp().isPending()) { if (app.getApp().isPending()) {
numPendingApps++; numPendingApps++;
@ -47,6 +47,7 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo {
numActiveApps++; numActiveApps++;
} }
} }
numPendingApps += queue.getNonRunnableAppSchedulables().size();
} }
public int getNumActiveApplications() { public int getNumActiveApplications() {

View File

@ -596,23 +596,24 @@ public class TestCapacityScheduler {
public void testConcurrentAccessOnApplications() throws Exception { public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler(); CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications( verifyConcurrentAccessOnApplications(
cs.applications, FiCaSchedulerApp.class); cs.applications, FiCaSchedulerApp.class, Queue.class);
} }
public static <T extends SchedulerApplication> public static <T extends SchedulerApplication, Q extends Queue>
void verifyConcurrentAccessOnApplications( void verifyConcurrentAccessOnApplications(
final Map<ApplicationAttemptId, T> applications, Class<T> clazz) final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
final Class<Q> queueClazz)
throws Exception { throws Exception {
final int size = 10000; final int size = 10000;
final ApplicationId appId = ApplicationId.newInstance(0, 0); final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Constructor<T> ctor = clazz.getDeclaredConstructor( final Constructor<T> ctor = appClazz.getDeclaredConstructor(
ApplicationAttemptId.class, String.class, Queue.class, ApplicationAttemptId.class, String.class, queueClazz,
ActiveUsersManager.class, RMContext.class); ActiveUsersManager.class, RMContext.class);
ApplicationAttemptId appAttemptId0 ApplicationAttemptId appAttemptId0
= ApplicationAttemptId.newInstance(appId, 0); = ApplicationAttemptId.newInstance(appId, 0);
applications.put(appAttemptId0, ctor.newInstance( applications.put(appAttemptId0, ctor.newInstance(
appAttemptId0, null, mock(Queue.class), null, null)); appAttemptId0, null, mock(queueClazz), null, null));
assertNotNull(applications.get(appAttemptId0)); assertNotNull(applications.get(appAttemptId0));
// Imitating the thread of scheduler that will add and remove apps // Imitating the thread of scheduler that will add and remove apps
@ -627,7 +628,7 @@ public class TestCapacityScheduler {
= ApplicationAttemptId.newInstance(appId, i); = ApplicationAttemptId.newInstance(appId, i);
try { try {
applications.put(appAttemptId, ctor.newInstance( applications.put(appAttemptId, ctor.newInstance(
appAttemptId, null, mock(Queue.class), null, null)); appAttemptId, null, mock(queueClazz), null, null));
} catch (Exception e) { } catch (Exception e) {
failed.set(true); failed.set(true);
finished.set(true); finished.set(true);

View File

@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -53,7 +52,7 @@ public class TestFSSchedulerApp {
@Test @Test
public void testDelayScheduling() { public void testDelayScheduling() {
Queue queue = Mockito.mock(Queue.class); FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class); Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1); Mockito.when(prio.getPriority()).thenReturn(1);
double nodeLocalityThreshold = .5; double nodeLocalityThreshold = .5;
@ -110,7 +109,7 @@ public class TestFSSchedulerApp {
@Test @Test
public void testDelaySchedulingForContinuousScheduling() public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException { throws InterruptedException {
Queue queue = Mockito.mock(Queue.class); FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class); Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1); Mockito.when(prio.getPriority()).thenReturn(1);
@ -170,7 +169,7 @@ public class TestFSSchedulerApp {
* no tin use), the least restrictive locality level is returned. * no tin use), the least restrictive locality level is returned.
*/ */
public void testLocalityLevelWithoutDelays() { public void testLocalityLevelWithoutDelays() {
Queue queue = Mockito.mock(Queue.class); FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class); Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1); Mockito.when(prio.getPriority()).thenReturn(1);

View File

@ -100,7 +100,7 @@ import com.google.common.collect.Sets;
public class TestFairScheduler { public class TestFairScheduler {
private class MockClock implements Clock { static class MockClock implements Clock {
private long time = 0; private long time = 0;
@Override @Override
public long getTime() { public long getTime() {
@ -613,9 +613,9 @@ public class TestFairScheduler {
appAttemptId, "default", "user1"); appAttemptId, "default", "user1");
scheduler.handle(appAddedEvent); scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true) assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
assertEquals("root.user1", rmApp.getQueue()); assertEquals("root.user1", rmApp.getQueue());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
@ -625,11 +625,11 @@ public class TestFairScheduler {
createAppAttemptId(2, 1), "default", "user2"); createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2); scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true) assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2", true) assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
} }
@Test @Test
@ -821,7 +821,7 @@ public class TestFairScheduler {
// That queue should have one app // That queue should have one app
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true) assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent( AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
@ -831,7 +831,7 @@ public class TestFairScheduler {
// Queue should have no apps // Queue should have no apps
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true) assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
.getAppSchedulables().size()); .getRunnableAppSchedulables().size());
} }
@Test @Test
@ -2400,7 +2400,158 @@ public class TestFairScheduler {
public void testConcurrentAccessOnApplications() throws Exception { public void testConcurrentAccessOnApplications() throws Exception {
FairScheduler fs = new FairScheduler(); FairScheduler fs = new FairScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications( TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FSSchedulerApp.class); fs.applications, FSSchedulerApp.class, FSLeafQueue.class);
}
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
FSSchedulerApp app = scheduler.applications.get(attId);
FSLeafQueue queue = app.getQueue();
Collection<AppSchedulable> runnableApps =
queue.getRunnableAppSchedulables();
Collection<AppSchedulable> nonRunnableApps =
queue.getNonRunnableAppSchedulables();
assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
}
private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
int numNonRunnableInQueue) {
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(queueName, false);
assertEquals(numRunnableInQueue,
queue.getRunnableAppSchedulables().size());
assertEquals(numNonRunnableInQueue,
queue.getNonRunnableAppSchedulables().size());
}
@Test
public void testUserAndQueueMaxRunningApps() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println("<maxRunningApps>2</maxRunningApps>");
out.println("</queue>");
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>1</maxRunningApps>");
out.println("</user>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
verifyAppRunnable(attId1, true);
verifyQueueNumRunnable("queue1", 1, 0);
// exceeds user limit
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue2", "user1");
verifyAppRunnable(attId2, false);
verifyQueueNumRunnable("queue2", 0, 1);
// exceeds no limits
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId3, true);
verifyQueueNumRunnable("queue1", 2, 0);
// exceeds queue limit
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId4, false);
verifyQueueNumRunnable("queue1", 2, 1);
// Remove app 1 and both app 2 and app 4 should becomes runnable in its place
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
attId1, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue2", 1, 0);
verifyAppRunnable(attId4, true);
verifyQueueNumRunnable("queue1", 2, 0);
// A new app to queue1 should not be runnable
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1", "user2");
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1", 2, 1);
}
@Test
public void testMaxRunningAppsHierarchicalQueues() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queue1\">");
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println(" <queue name=\"sub1\"></queue>");
out.println(" <queue name=\"sub2\"></queue>");
out.println(" <queue name=\"sub3\">");
out.println(" <maxRunningApps>1</maxRunningApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// exceeds no limits
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
verifyAppRunnable(attId1, true);
verifyQueueNumRunnable("queue1.sub1", 1, 0);
clock.tick(10);
// exceeds no limits
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId2, true);
verifyQueueNumRunnable("queue1.sub3", 1, 0);
clock.tick(10);
// exceeds no limits
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId3, true);
verifyQueueNumRunnable("queue1.sub2", 1, 0);
clock.tick(10);
// exceeds queue1 limit
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1");
verifyAppRunnable(attId4, false);
verifyQueueNumRunnable("queue1.sub2", 1, 1);
clock.tick(10);
// exceeds sub3 limit
ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1");
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1.sub3", 1, 1);
clock.tick(10);
// Even though the app was removed from sub3, the app from sub2 gets to go
// because it came in first
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
attId2, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent1);
verifyAppRunnable(attId4, true);
verifyQueueNumRunnable("queue1.sub2", 2, 0);
verifyAppRunnable(attId5, false);
verifyQueueNumRunnable("queue1.sub3", 0, 1);
// Now test removal of a non-runnable app
AppRemovedSchedulerEvent appRemovedEvent2 = new AppRemovedSchedulerEvent(
attId5, RMAppAttemptState.KILLED);
scheduler.handle(appRemovedEvent2);
assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
.get("user1").size());
// verify app gone in queue accounting
verifyQueueNumRunnable("queue1.sub3", 0, 0);
// verify it doesn't become runnable when there would be space for it
AppRemovedSchedulerEvent appRemovedEvent3 = new AppRemovedSchedulerEvent(
attId4, RMAppAttemptState.FINISHED);
scheduler.handle(appRemovedEvent3);
verifyQueueNumRunnable("queue1.sub2", 1, 0);
verifyQueueNumRunnable("queue1.sub3", 0, 0);
} }
@Test (timeout = 10000) @Test (timeout = 10000)
@ -2499,23 +2650,23 @@ public class TestFairScheduler {
// Should get put into jerry // Should get put into jerry
createSchedulingRequest(1024, "jerry", "someuser"); createSchedulingRequest(1024, "jerry", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size()); assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
// Should get forced into default // Should get forced into default
createSchedulingRequest(1024, "newqueue", "someuser"); createSchedulingRequest(1024, "newqueue", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size()); assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(1, defaultQueue.getAppSchedulables().size()); assertEquals(1, defaultQueue.getRunnableAppSchedulables().size());
// Would get put into someuser because of user-as-default-queue, but should // Would get put into someuser because of user-as-default-queue, but should
// be forced into default // be forced into default
createSchedulingRequest(1024, "default", "someuser"); createSchedulingRequest(1024, "default", "someuser");
assertEquals(1, jerryQueue.getAppSchedulables().size()); assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(2, defaultQueue.getAppSchedulables().size()); assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
// Should get put into jerry because of user-as-default-queue // Should get put into jerry because of user-as-default-queue
createSchedulingRequest(1024, "default", "jerry"); createSchedulingRequest(1024, "default", "jerry");
assertEquals(2, jerryQueue.getAppSchedulables().size()); assertEquals(2, jerryQueue.getRunnableAppSchedulables().size());
assertEquals(2, defaultQueue.getAppSchedulables().size()); assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
} }
@SuppressWarnings("resource") @SuppressWarnings("resource")

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@ -518,7 +519,7 @@ public class TestFifoScheduler {
public void testConcurrentAccessOnApplications() throws Exception { public void testConcurrentAccessOnApplications() throws Exception {
FifoScheduler fs = new FifoScheduler(); FifoScheduler fs = new FifoScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications( TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FiCaSchedulerApp.class); fs.applications, FiCaSchedulerApp.class, Queue.class);
} }
@SuppressWarnings("resource") @SuppressWarnings("resource")