YARN-1499. Fair Scheduler changes for moving apps between queues (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1567484 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-02-12 01:19:42 +00:00
parent 5d53da64db
commit 9de407364d
8 changed files with 326 additions and 30 deletions

View File

@ -101,6 +101,9 @@ Release 2.4.0 - UNRELEASED
YARN-1504. RM changes for moving apps between queues (Sandy Ryza) YARN-1504. RM changes for moving apps between queues (Sandy Ryza)
YARN-1499. Fair Scheduler changes for moving apps between queues (Sandy
Ryza)
IMPROVEMENTS IMPROVEMENTS
YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via YARN-1007. Enhance History Reader interface for Containers. (Mayank Bansal via

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@Unstable @Unstable
public class SchedulerApplication { public class SchedulerApplication {
private final Queue queue; private Queue queue;
private final String user; private final String user;
private SchedulerApplicationAttempt currentAttempt; private SchedulerApplicationAttempt currentAttempt;
@ -37,6 +37,10 @@ public class SchedulerApplication {
public Queue getQueue() { public Queue getQueue() {
return queue; return queue;
} }
public void setQueue(Queue queue) {
this.queue = queue;
}
public String getUser() { public String getUser() {
return user; return user;

View File

@ -39,7 +39,8 @@ public class AllocationConfiguration {
// Minimum resource allocation for each queue // Minimum resource allocation for each queue
private final Map<String, Resource> minQueueResources; private final Map<String, Resource> minQueueResources;
// Maximum amount of resources per queue // Maximum amount of resources per queue
private final Map<String, Resource> maxQueueResources; @VisibleForTesting
final Map<String, Resource> maxQueueResources;
// Sharing weights for each queue // Sharing weights for each queue
private final Map<String, ResourceWeights> queueWeights; private final Map<String, ResourceWeights> queueWeights;

View File

@ -766,7 +766,9 @@ public class FairScheduler extends AbstractYarnScheduler {
boolean wasRunnable = queue.removeApp(attempt); boolean wasRunnable = queue.removeApp(attempt);
if (wasRunnable) { if (wasRunnable) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt); maxRunningEnforcer.untrackRunnableApp(attempt);
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt,
attempt.getQueue());
} else { } else {
maxRunningEnforcer.untrackNonRunnableApp(attempt); maxRunningEnforcer.untrackNonRunnableApp(attempt);
} }
@ -1355,4 +1357,119 @@ public class FairScheduler extends AbstractYarnScheduler {
queue.collectSchedulerApplications(apps); queue.collectSchedulerApplications(apps);
return apps; return apps;
} }
@Override
public synchronized String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
SchedulerApplication app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
if (targetQueue == null) {
throw new YarnException("Target queue " + queueName
+ " not found or is not a leaf queue.");
}
if (targetQueue == oldQueue) {
return oldQueue.getQueueName();
}
if (oldQueue.getRunnableAppSchedulables().contains(
attempt.getAppSchedulable())) {
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
}
executeMove(app, attempt, oldQueue, targetQueue);
return targetQueue.getQueueName();
}
private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
String queueName = targetQueue.getQueueName();
ApplicationAttemptId appAttId = app.getApplicationAttemptId();
// When checking maxResources and maxRunningApps, only need to consider
// queues before the lowest common ancestor of the two queues because the
// total running apps in queues above will not be changed.
FSQueue lowestCommonAncestor = findLowestCommonAncestorQueue(oldQueue,
targetQueue);
Resource consumption = app.getCurrentConsumption();
// Check whether the move would go over maxRunningApps or maxShare
FSQueue cur = targetQueue;
while (cur != lowestCommonAncestor) {
// maxRunningApps
if (cur.getNumRunnableApps() == allocConf.getQueueMaxApps(cur.getQueueName())) {
throw new YarnException("Moving app attempt " + appAttId + " to queue "
+ queueName + " would violate queue maxRunningApps constraints on"
+ " queue " + cur.getQueueName());
}
// maxShare
if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
cur.getMaxShare())) {
throw new YarnException("Moving app attempt " + appAttId + " to queue "
+ queueName + " would violate queue maxShare constraints on"
+ " queue " + cur.getQueueName());
}
cur = cur.getParent();
}
}
/**
* Helper for moveApplication, which is synchronized, so all operations will
* be atomic.
*/
private void executeMove(SchedulerApplication app, FSSchedulerApp attempt,
FSLeafQueue oldQueue, FSLeafQueue newQueue) {
boolean wasRunnable = oldQueue.removeApp(attempt);
// if app was not runnable before, it may be runnable now
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,
attempt.getUser());
if (wasRunnable && !nowRunnable) {
throw new IllegalStateException("Should have already verified that app "
+ attempt.getApplicationId() + " would be runnable in new queue");
}
if (wasRunnable) {
maxRunningEnforcer.untrackRunnableApp(attempt);
} else if (nowRunnable) {
// App has changed from non-runnable to runnable
maxRunningEnforcer.untrackNonRunnableApp(attempt);
}
attempt.move(newQueue); // This updates all the metrics
app.setQueue(newQueue);
newQueue.addApp(attempt, nowRunnable);
if (nowRunnable) {
maxRunningEnforcer.trackRunnableApp(attempt);
}
if (wasRunnable) {
maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt, oldQueue);
}
}
private FSQueue findLowestCommonAncestorQueue(FSQueue queue1, FSQueue queue2) {
// Because queue names include ancestors, separated by periods, we can find
// the lowest common ancestors by going from the start of the names until
// there's a character that doesn't match.
String name1 = queue1.getName();
String name2 = queue2.getName();
// We keep track of the last period we encounter to avoid returning root.apple
// when the queues are root.applepie and root.appletart
int lastPeriodIndex = -1;
for (int i = 0; i < Math.max(name1.length(), name2.length()); i++) {
if (name1.length() <= i || name2.length() <= i ||
name1.charAt(i) != name2.charAt(i)) {
return queueMgr.getQueue(name1.substring(lastPeriodIndex));
} else if (name1.charAt(i) == '.') {
lastPeriodIndex = i;
}
}
return queue1; // names are identical
}
} }

View File

@ -105,26 +105,15 @@ public class MaxRunningAppsEnforcer {
} }
/** /**
* Updates the relevant tracking variables after a runnable app with the given * Checks to see whether any other applications runnable now that the given
* queue and user has been removed. Checks to see whether any other applications * application has been removed from the given queue. And makes them so.
* are now runnable and makes them so.
* *
* Runs in O(n log(n)) where n is the number of queues that are under the * Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack. * highest queue that went from having no slack to having slack.
*/ */
public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) { public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration(); AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
if (newUserNumRunning == 0) {
usersNumRunnableApps.remove(user);
} else {
usersNumRunnableApps.put(user, newUserNumRunning);
}
// Update runnable app bookkeeping for queues:
// childqueueX might have no pending apps itself, but if a queue higher up // childqueueX might have no pending apps itself, but if a queue higher up
// in the hierarchy parentqueueY has a maxRunningApps set, an app completion // in the hierarchy parentqueueY has a maxRunningApps set, an app completion
// in childqueueX could allow an app in some other distant child of // in childqueueX could allow an app in some other distant child of
@ -133,16 +122,14 @@ public class MaxRunningAppsEnforcer {
// the queue was already at its max before the removal. // the queue was already at its max before the removal.
// Thus we find the ancestor queue highest in the tree for which the app // Thus we find the ancestor queue highest in the tree for which the app
// that was at its maxRunningApps before the removal. // that was at its maxRunningApps before the removal.
FSLeafQueue queue = app.getQueue();
FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() == FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null; allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
FSParentQueue parent = queue.getParent(); FSParentQueue parent = queue.getParent();
while (parent != null) { while (parent != null) {
if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent
.getName())) { .getName()) - 1) {
highestQueueWithAppsNowRunnable = parent; highestQueueWithAppsNowRunnable = parent;
} }
parent.decrementRunnableApps();
parent = parent.getParent(); parent = parent.getParent();
} }
@ -157,7 +144,12 @@ public class MaxRunningAppsEnforcer {
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
appsNowMaybeRunnable); appsNowMaybeRunnable);
} }
if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) { String user = app.getUser();
Integer userNumRunning = usersNumRunnableApps.get(user);
if (userNumRunning == null) {
userNumRunning = 0;
}
if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user); List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) { if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps); appsNowMaybeRunnable.add(userWaitingApps);
@ -208,6 +200,29 @@ public class MaxRunningAppsEnforcer {
} }
} }
/**
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.
*/
public void untrackRunnableApp(FSSchedulerApp app) {
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
if (newUserNumRunning == 0) {
usersNumRunnableApps.remove(user);
} else {
usersNumRunnableApps.put(user, newUserNumRunning);
}
// Update runnable app bookkeeping for queues
FSLeafQueue queue = app.getQueue();
FSParentQueue parent = queue.getParent();
while (parent != null) {
parent.decrementRunnableApps();
parent = parent.getParent();
}
}
/** /**
* Stops tracking the given non-runnable app * Stops tracking the given non-runnable app
*/ */

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -56,10 +57,12 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -2547,4 +2550,138 @@ public class TestFairScheduler {
TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler(
scheduler.getSchedulerApplications(), scheduler, "default"); scheduler.getSchedulerApplications(), scheduler, "default");
} }
@Test
public void testMoveRunnableApp() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
ApplicationId appId = appAttId.getApplicationId();
RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
scheduler.handle(updateEvent);
assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage());
scheduler.update();
assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
scheduler.moveApplication(appId, "queue2");
FSSchedulerApp app = scheduler.getSchedulerApp(appAttId);
assertSame(targetQueue, app.getQueue());
assertFalse(oldQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertTrue(targetQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage());
assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage());
assertEquals(0, oldQueue.getNumRunnableApps());
assertEquals(1, targetQueue.getNumRunnableApps());
assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());
scheduler.update();
assertEquals(Resource.newInstance(0, 0), oldQueue.getDemand());
assertEquals(Resource.newInstance(3072, 3), targetQueue.getDemand());
}
@Test
public void testMoveNonRunnableApp() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue1", 0);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0);
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
assertEquals(0, oldQueue.getNumRunnableApps());
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
assertEquals(0, oldQueue.getNumRunnableApps());
assertEquals(0, targetQueue.getNumRunnableApps());
assertEquals(0, queueMgr.getRootQueue().getNumRunnableApps());
}
@Test
public void testMoveMakesAppRunnable() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue1", 0);
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
FSSchedulerApp app = scheduler.getSchedulerApp(appAttId);
assertTrue(oldQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
assertFalse(oldQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertFalse(targetQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertTrue(targetQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertEquals(1, targetQueue.getNumRunnableApps());
assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());
}
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxAppsConstraints() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0);
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}
@Test (expected = YarnException.class)
public void testMoveWouldViolateMaxResourcesConstraints() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true);
queueMgr.getLeafQueue("queue2", true);
scheduler.getAllocationConfiguration().maxQueueResources.put("root.queue2",
Resource.newInstance(1024, 1));
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2));
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
scheduler.handle(nodeEvent);
scheduler.handle(updateEvent);
scheduler.handle(updateEvent);
assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage());
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}
@Test (expected = YarnException.class)
public void testMoveToNonexistentQueue() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().getLeafQueue("queue1", true);
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
}
} }

View File

@ -77,7 +77,8 @@ public class TestMaxRunningAppsEnforcer {
private void removeApp(FSSchedulerApp app) { private void removeApp(FSSchedulerApp app) {
app.getQueue().removeApp(app); app.getQueue().removeApp(app);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(app); maxAppsEnforcer.untrackRunnableApp(app);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue());
} }
@Test @Test

View File

@ -349,16 +349,20 @@ Queue Access Control Lists (ACLs)
* {Administration} * {Administration}
The fair scheduler provides support for administration at runtime through two mechanisms: The fair scheduler provides support for administration at runtime through a few mechanisms:
* It is possible to modify minimum shares, limits, weights, preemption timeouts Modifying configuration at runtime
and queue scheduling policies at runtime by editing the allocation file. The
scheduler will reload this file 10-15 seconds after it sees that it was
modified.
* Current applications, queues, and fair shares can be examined through the It is possible to modify minimum shares, limits, weights, preemption timeouts
ResourceManager's web interface, at and queue scheduling policies at runtime by editing the allocation file. The
http://<ResourceManager URL>/cluster/scheduler. scheduler will reload this file 10-15 seconds after it sees that it was
modified.
Monitoring through web UI
Current applications, queues, and fair shares can be examined through the
ResourceManager's web interface, at
http://<ResourceManager URL>/cluster/scheduler.
The following fields can be seen for each queue on the web interface: The following fields can be seen for each queue on the web interface:
@ -382,3 +386,17 @@ Queue Access Control Lists (ACLs)
In addition to the information that the ResourceManager normally displays In addition to the information that the ResourceManager normally displays
about each application, the web interface includes the application's fair share. about each application, the web interface includes the application's fair share.
Moving applications between queues
The Fair Scheduler supports moving a running application to a different queue.
This can be useful for moving an important application to a higher priority
queue, or for moving an unimportant application to a lower priority queue.
Apps can be moved by running "yarn application -movetoqueue appID -queue
targetQueueName".
When an application is moved to a queue, its existing allocations become
counted with the new queue's allocations instead of the old for purposes
of determining fairness. An attempt to move an application to a queue will
fail if the addition of the app's resources to that queue would violate the
its maxRunningApps or maxResources constraints.