Merge -r 1407432:1407433 from trunk to branch. Fixes: YARN-183. Clean up fair scheduler code. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1407434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-11-09 12:39:26 +00:00
parent 3d73261636
commit c6dd3c0263
10 changed files with 132 additions and 144 deletions

View File

@ -52,6 +52,8 @@ Release 2.0.3-alpha - Unreleased
YARN-136. Make ClientToAMTokenSecretManager part of RMContext (Vinod Kumar YARN-136. Make ClientToAMTokenSecretManager part of RMContext (Vinod Kumar
Vavilapalli via sseth) Vavilapalli via sseth)
YARN-183. Clean up fair scheduler code. (Sandy Ryza via tomwhite)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -101,7 +101,7 @@ public class AppSchedulable extends Schedulable {
@Override @Override
public Resource getResourceUsage() { public Resource getResourceUsage() {
return this.app.getCurrentConsumption(); return app.getCurrentConsumption();
} }
@ -114,7 +114,7 @@ public class AppSchedulable extends Schedulable {
* Get metrics reference from containing queue. * Get metrics reference from containing queue.
*/ */
public QueueMetrics getMetrics() { public QueueMetrics getMetrics() {
return this.queue.getQueueSchedulable().getMetrics(); return queue.getQueueSchedulable().getMetrics();
} }
@Override @Override
@ -190,9 +190,9 @@ public class AppSchedulable extends Schedulable {
RMContainer rmContainer = application.reserve(node, priority, null, RMContainer rmContainer = application.reserve(node, priority, null,
container); container);
node.reserveResource(application, priority, rmContainer); node.reserveResource(application, priority, rmContainer);
getMetrics().reserveResource(this.app.getUser(), getMetrics().reserveResource(app.getUser(),
container.getResource()); container.getResource());
scheduler.getRootQueueMetrics().reserveResource(this.app.getUser(), scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
container.getResource()); container.getResource());
} }
@ -257,13 +257,13 @@ public class AppSchedulable extends Schedulable {
// TODO this should subtract resource just assigned // TODO this should subtract resource just assigned
// TEMPROARY // TEMPROARY
getMetrics().setAvailableResourcesToQueue( getMetrics().setAvailableResourcesToQueue(
this.scheduler.getClusterCapacity()); scheduler.getClusterCapacity());
} }
// If we had previously made a reservation, delete it // If we had previously made a reservation, delete it
if (reserved) { if (reserved) {
this.unreserve(application, priority, node); unreserve(application, priority, node);
} }
// Inform the node // Inform the node
@ -290,7 +290,7 @@ public class AppSchedulable extends Schedulable {
// Make sure the application still needs requests at this priority // Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) { if (app.getTotalRequiredResources(priority) == 0) {
this.unreserve(app, priority, node); unreserve(app, priority, node);
return Resources.none(); return Resources.none();
} }
} else { } else {

View File

@ -61,7 +61,7 @@ public class FSQueue {
queueSchedulable.addApp(appSchedulable); queueSchedulable.addApp(appSchedulable);
} }
public void removeJob(FSSchedulerApp app) { public void removeApp(FSSchedulerApp app) {
applications.remove(app); applications.remove(app);
queueSchedulable.removeApp(app); queueSchedulable.removeApp(app);
} }

View File

@ -80,7 +80,7 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
this.scheduler = scheduler; this.scheduler = scheduler;
this.queue = queue; this.queue = queue;
this.queueMgr = scheduler.getQueueManager(); this.queueMgr = scheduler.getQueueManager();
this.metrics = QueueMetrics.forQueue(this.getName(), null, true, scheduler.getConf()); this.metrics = QueueMetrics.forQueue(getName(), null, true, scheduler.getConf());
this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
} }
@ -113,7 +113,7 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
Resource toAdd = sched.getDemand(); Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + sched.getName() + " " + toAdd LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ "; Total resource consumption for " + this.getName() + " now " + "; Total resource consumption for " + getName() + " now "
+ demand); + demand);
} }
demand = Resources.add(demand, toAdd); demand = Resources.add(demand, toAdd);
@ -123,7 +123,7 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
} }
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + this.getName() + " is " + demand LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes); + "; the max is " + maxRes);
} }
} }
@ -164,9 +164,9 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
@Override @Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) { public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.debug("Node offered to queue: " + this.getName() + " reserved: " + reserved); LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
// If this queue is over its limit, reject // If this queue is over its limit, reject
if (Resources.greaterThan(this.getResourceUsage(), if (Resources.greaterThan(getResourceUsage(),
queueMgr.getMaxResources(queue.getName()))) { queueMgr.getMaxResources(queue.getName()))) {
return Resources.none(); return Resources.none();
} }
@ -258,7 +258,7 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
@Override @Override
public Map<QueueACL, AccessControlList> getQueueAcls() { public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName()); Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
return new HashMap<QueueACL, AccessControlList>(acls); return new HashMap<QueueACL, AccessControlList>(acls);
} }
@ -284,7 +284,7 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
recordFactory.newRecordInstance(QueueUserACLInfo.class); recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>(); List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) { for (QueueACL operation : QueueACL.values()) {
Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName()); Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
if (acls.get(operation).isUserAllowed(user)) { if (acls.get(operation).isUserAllowed(user)) {
operations.add(operation); operations.add(operation);
} }

View File

@ -112,12 +112,12 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
return this.appSchedulingInfo.getApplicationId(); return appSchedulingInfo.getApplicationId();
} }
@Override @Override
public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.appSchedulingInfo.getApplicationAttemptId(); return appSchedulingInfo.getApplicationAttemptId();
} }
public void setAppSchedulable(AppSchedulable appSchedulable) { public void setAppSchedulable(AppSchedulable appSchedulable) {
@ -129,7 +129,7 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
public String getUser() { public String getUser() {
return this.appSchedulingInfo.getUser(); return appSchedulingInfo.getUser();
} }
public synchronized void updateResourceRequests( public synchronized void updateResourceRequests(
@ -138,19 +138,19 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
public Map<String, ResourceRequest> getResourceRequests(Priority priority) { public Map<String, ResourceRequest> getResourceRequests(Priority priority) {
return this.appSchedulingInfo.getResourceRequests(priority); return appSchedulingInfo.getResourceRequests(priority);
} }
public int getNewContainerId() { public int getNewContainerId() {
return this.appSchedulingInfo.getNewContainerId(); return appSchedulingInfo.getNewContainerId();
} }
public Collection<Priority> getPriorities() { public Collection<Priority> getPriorities() {
return this.appSchedulingInfo.getPriorities(); return appSchedulingInfo.getPriorities();
} }
public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) {
return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress); return appSchedulingInfo.getResourceRequest(priority, nodeAddress);
} }
public synchronized int getTotalRequiredResources(Priority priority) { public synchronized int getTotalRequiredResources(Priority priority) {
@ -158,7 +158,7 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
public Resource getResource(Priority priority) { public Resource getResource(Priority priority) {
return this.appSchedulingInfo.getResource(priority); return appSchedulingInfo.getResource(priority);
} }
/** /**
@ -167,11 +167,11 @@ public class FSSchedulerApp extends SchedulerApplication {
*/ */
@Override @Override
public boolean isPending() { public boolean isPending() {
return this.appSchedulingInfo.isPending(); return appSchedulingInfo.isPending();
} }
public String getQueueName() { public String getQueueName() {
return this.appSchedulingInfo.getQueueName(); return appSchedulingInfo.getQueueName();
} }
/** /**
@ -185,7 +185,7 @@ public class FSSchedulerApp extends SchedulerApplication {
public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) {
// Cleanup all scheduling information // Cleanup all scheduling information
this.appSchedulingInfo.stop(rmAppAttemptFinalState); appSchedulingInfo.stop(rmAppAttemptFinalState);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -196,7 +196,7 @@ public class FSSchedulerApp extends SchedulerApplication {
getRMContainer(containerId); getRMContainer(containerId);
if (rmContainer == null) { if (rmContainer == null) {
// Some unknown container sneaked into the system. Kill it. // Some unknown container sneaked into the system. Kill it.
this.rmContext.getDispatcher().getEventHandler() rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId)); .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return; return;
} }
@ -272,7 +272,7 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
synchronized public void addSchedulingOpportunity(Priority priority) { synchronized public void addSchedulingOpportunity(Priority priority) {
this.schedulingOpportunities.setCount(priority, schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1); schedulingOpportunities.count(priority) + 1);
} }
@ -282,19 +282,19 @@ public class FSSchedulerApp extends SchedulerApplication {
* successfully did so. * successfully did so.
*/ */
synchronized public int getSchedulingOpportunities(Priority priority) { synchronized public int getSchedulingOpportunities(Priority priority) {
return this.schedulingOpportunities.count(priority); return schedulingOpportunities.count(priority);
} }
synchronized void resetReReservations(Priority priority) { synchronized void resetReReservations(Priority priority) {
this.reReservations.setCount(priority, 0); reReservations.setCount(priority, 0);
} }
synchronized void addReReservation(Priority priority) { synchronized void addReReservation(Priority priority) {
this.reReservations.add(priority); reReservations.add(priority);
} }
synchronized public int getReReservations(Priority priority) { synchronized public int getReReservations(Priority priority) {
return this.reReservations.count(priority); return reReservations.count(priority);
} }
public synchronized int getNumReservedContainers(Priority priority) { public synchronized int getNumReservedContainers(Priority priority) {
@ -458,8 +458,8 @@ public class FSSchedulerApp extends SchedulerApplication {
* @param priority The priority of the container scheduled. * @param priority The priority of the container scheduled.
*/ */
synchronized public void resetSchedulingOpportunities(Priority priority) { synchronized public void resetSchedulingOpportunities(Priority priority) {
this.lastScheduledContainer.put(priority, System.currentTimeMillis()); lastScheduledContainer.put(priority, System.currentTimeMillis());
this.schedulingOpportunities.setCount(priority, 0); schedulingOpportunities.setCount(priority, 0);
} }
/** /**
@ -494,14 +494,14 @@ public class FSSchedulerApp extends SchedulerApplication {
rackLocalityThreshold; rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold. // Relax locality constraints once we've surpassed threshold.
if (this.getSchedulingOpportunities(priority) > (numNodes * threshold)) { if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) { if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL); allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
this.resetSchedulingOpportunities(priority); resetSchedulingOpportunities(priority);
} }
else if (allowed.equals(NodeType.RACK_LOCAL)) { else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH); allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
this.resetSchedulingOpportunities(priority); resetSchedulingOpportunities(priority);
} }
} }
return allowedLocalityLevel.get(priority); return allowedLocalityLevel.get(priority);
@ -512,7 +512,7 @@ public class FSSchedulerApp extends SchedulerApplication {
Priority priority, ResourceRequest request, Priority priority, ResourceRequest request,
Container container) { Container container) {
// Update allowed locality level // Update allowed locality level
NodeType allowed = this.allowedLocalityLevel.get(priority); NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) { if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) && if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) || (type.equals(NodeType.NODE_LOCAL) ||
@ -532,9 +532,9 @@ public class FSSchedulerApp extends SchedulerApplication {
} }
// Create RMContainer // Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this RMContainer rmContainer = new RMContainerImpl(container,
.getApplicationAttemptId(), node.getNodeID(), this.rmContext getApplicationAttemptId(), node.getNodeID(), rmContext
.getDispatcher().getEventHandler(), this.rmContext .getDispatcher().getEventHandler(), rmContext
.getContainerAllocationExpirer()); .getContainerAllocationExpirer());
// Add it to allContainers list. // Add it to allContainers list.

View File

@ -67,25 +67,25 @@ public class FSSchedulerNode extends SchedulerNode {
} }
public RMNode getRMNode() { public RMNode getRMNode() {
return this.rmNode; return rmNode;
} }
public NodeId getNodeID() { public NodeId getNodeID() {
return this.rmNode.getNodeID(); return rmNode.getNodeID();
} }
public String getHttpAddress() { public String getHttpAddress() {
return this.rmNode.getHttpAddress(); return rmNode.getHttpAddress();
} }
@Override @Override
public String getHostName() { public String getHostName() {
return this.rmNode.getHostName(); return rmNode.getHostName();
} }
@Override @Override
public String getRackName() { public String getRackName() {
return this.rmNode.getRackName(); return rmNode.getRackName();
} }
/** /**
@ -112,17 +112,18 @@ public class FSSchedulerNode extends SchedulerNode {
@Override @Override
public synchronized Resource getAvailableResource() { public synchronized Resource getAvailableResource() {
return this.availableResource; return availableResource;
} }
@Override @Override
public synchronized Resource getUsedResource() { public synchronized Resource getUsedResource() {
return this.usedResource; return usedResource;
} }
private synchronized boolean isValidContainer(Container c) { private synchronized boolean isValidContainer(Container c) {
if (launchedContainers.containsKey(c.getId())) if (launchedContainers.containsKey(c.getId())) {
return true; return true;
}
return false; return false;
} }

View File

@ -139,11 +139,11 @@ public class FairScheduler implements ResourceScheduler {
public FairSchedulerConfiguration getConf() { public FairSchedulerConfiguration getConf() {
return this.conf; return conf;
} }
public QueueManager getQueueManager() { public QueueManager getQueueManager() {
return this.queueMgr; return queueMgr;
} }
public List<FSQueueSchedulable> getQueueSchedulables() { public List<FSQueueSchedulable> getQueueSchedulables() {
@ -183,36 +183,34 @@ public class FairScheduler implements ResourceScheduler {
* fair shares, deficits, minimum slot allocations, and amount of used and * fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job. * required resources per job.
*/ */
protected void update() { protected synchronized void update() {
synchronized (this) { queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file updateRunnability(); // Set job runnability based on user/queue limits
updateRunnability(); // Set job runnability based on user/queue limits updatePreemptionVariables(); // Determine if any queues merit preemption
updatePreemptionVariables(); // Determine if any queues merit preemption
// Update demands of apps and queues // Update demands of apps and queues
for (FSQueue queue: queueMgr.getQueues()) { for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().updateDemand(); queue.getQueueSchedulable().updateDemand();
}
// Compute fair shares based on updated demands
List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables();
SchedulingAlgorithms.computeFairShares(
queueScheds, clusterCapacity);
// Update queue metrics for this queue
for (FSQueueSchedulable sched : queueScheds) {
sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
}
// Use the computed shares to assign shares within each queue
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().redistributeShare();
}
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
} }
// Compute fair shares based on updated demands
List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
SchedulingAlgorithms.computeFairShares(
queueScheds, clusterCapacity);
// Update queue metrics for this queue
for (FSQueueSchedulable sched : queueScheds) {
sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
}
// Use the computed shares to assign shares within each queue
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().redistributeShare();
}
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
} }
/** /**
@ -257,17 +255,16 @@ public class FairScheduler implements ResourceScheduler {
* have been below half their fair share for the fairSharePreemptionTimeout. * have been below half their fair share for the fairSharePreemptionTimeout.
* If such queues exist, compute how many tasks of each type need to be * If such queues exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks. * preempted and then select the right ones using preemptTasks.
*
* This method computes and logs the number of tasks we want to preempt even
* if preemption is disabled, for debugging purposes.
*/ */
protected void preemptTasksIfNecessary() { protected void preemptTasksIfNecessary() {
if (!preemptionEnabled) if (!preemptionEnabled) {
return; return;
}
long curTime = clock.getTime(); long curTime = clock.getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval) if (curTime - lastPreemptCheckTime < preemptionInterval) {
return; return;
}
lastPreemptCheckTime = curTime; lastPreemptCheckTime = curTime;
Resource resToPreempt = Resources.none(); Resource resToPreempt = Resources.none();
@ -288,8 +285,9 @@ public class FairScheduler implements ResourceScheduler {
* lowest priority to preempt. * lowest priority to preempt.
*/ */
protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) { protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return; return;
}
Map<RMContainer, FSSchedulerApp> apps = Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>(); new HashMap<RMContainer, FSSchedulerApp>();
@ -330,7 +328,7 @@ public class FairScheduler implements ResourceScheduler {
// TODO: Not sure if this ever actually adds this to the list of cleanup // TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()). // containers on the RMNode (see SchedulerNode.releaseContainer()).
this.completedContainer(container, status, RMContainerEventType.KILL); completedContainer(container, status, RMContainerEventType.KILL);
toPreempt = Resources.subtract(toPreempt, toPreempt = Resources.subtract(toPreempt,
container.getContainer().getResource()); container.getContainer().getResource());
@ -413,7 +411,7 @@ public class FairScheduler implements ResourceScheduler {
} }
public RMContainerTokenSecretManager getContainerTokenSecretManager() { public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.rmContext.getContainerTokenSecretManager(); return rmContext.getContainerTokenSecretManager();
} }
public double getAppWeight(AppSchedulable app) { public double getAppWeight(AppSchedulable app) {
@ -437,28 +435,28 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public Resource getMinimumResourceCapability() { public Resource getMinimumResourceCapability() {
return this.minimumAllocation; return minimumAllocation;
} }
@Override @Override
public Resource getMaximumResourceCapability() { public Resource getMaximumResourceCapability() {
return this.maximumAllocation; return maximumAllocation;
} }
public double getNodeLocalityThreshold() { public double getNodeLocalityThreshold() {
return this.nodeLocalityThreshold; return nodeLocalityThreshold;
} }
public double getRackLocalityThreshold() { public double getRackLocalityThreshold() {
return this.rackLocalityThreshold; return rackLocalityThreshold;
} }
public Resource getClusterCapacity() { public Resource getClusterCapacity() {
return this.clusterCapacity; return clusterCapacity;
} }
public Clock getClock() { public Clock getClock() {
return this.clock; return clock;
} }
protected void setClock(Clock clock) { protected void setClock(Clock clock) {
@ -478,11 +476,11 @@ public class FairScheduler implements ResourceScheduler {
addApplication(ApplicationAttemptId applicationAttemptId, addApplication(ApplicationAttemptId applicationAttemptId,
String queueName, String user) { String queueName, String user) {
FSQueue queue = this.queueMgr.getQueue(queueName); FSQueue queue = queueMgr.getQueue(queueName);
FSSchedulerApp schedulerApp = FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user, new FSSchedulerApp(applicationAttemptId, user,
queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()), queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
rmContext, null); rmContext, null);
// Inforce ACLs // Inforce ACLs
@ -553,8 +551,8 @@ public class FairScheduler implements ResourceScheduler {
application.stop(rmAppAttemptFinalState); application.stop(rmAppAttemptFinalState);
// Inform the queue // Inform the queue
FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName()); FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
queue.removeJob(application); queue.removeApp(application);
// Remove from our data-structure // Remove from our data-structure
applications.remove(applicationAttemptId); applications.remove(applicationAttemptId);
@ -600,7 +598,7 @@ public class FairScheduler implements ResourceScheduler {
} }
private synchronized void addNode(RMNode node) { private synchronized void addNode(RMNode node) {
this.nodes.put(node.getNodeID(), new FSSchedulerNode(node)); nodes.put(node.getNodeID(), new FSSchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability()); Resources.addTo(clusterCapacity, node.getTotalCapability());
LOG.info("Added node " + node.getNodeAddress() + LOG.info("Added node " + node.getNodeAddress() +
@ -608,7 +606,7 @@ public class FairScheduler implements ResourceScheduler {
} }
private synchronized void removeNode(RMNode rmNode) { private synchronized void removeNode(RMNode rmNode) {
FSSchedulerNode node = this.nodes.get(rmNode.getNodeID()); FSSchedulerNode node = nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
// Remove running containers // Remove running containers
@ -631,7 +629,7 @@ public class FairScheduler implements ResourceScheduler {
RMContainerEventType.KILL); RMContainerEventType.KILL);
} }
this.nodes.remove(rmNode.getNodeID()); nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() + LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterCapacity); " cluster capacity: " + clusterCapacity);
} }
@ -669,10 +667,8 @@ public class FairScheduler implements ResourceScheduler {
} }
synchronized (application) { synchronized (application) {
if (!ask.isEmpty()) { if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
if(LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" + LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + appAttemptId + " applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId()); " application=" + application.getApplicationId());
@ -686,7 +682,7 @@ public class FairScheduler implements ResourceScheduler {
application.showRequests(); application.showRequests();
} }
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" + LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId + " applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size()); " #ask=" + ask.size());
@ -764,7 +760,7 @@ public class FairScheduler implements ResourceScheduler {
int assignedContainers = 0; int assignedContainers = 0;
while (true) { while (true) {
// At most one task is scheduled each iteration of this loop // At most one task is scheduled each iteration of this loop
List<FSQueueSchedulable> scheds = this.getQueueSchedulables(); List<FSQueueSchedulable> scheds = getQueueSchedulables();
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false; boolean assignedContainer = false;
for (FSQueueSchedulable sched : scheds) { for (FSQueueSchedulable sched : scheds) {
@ -796,11 +792,11 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public SchedulerAppReport getSchedulerAppInfo( public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) { ApplicationAttemptId appAttemptId) {
if (!this.applications.containsKey(appAttemptId)) { if (!applications.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId); LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null; return null;
} }
return new SchedulerAppReport(this.applications.get(appAttemptId)); return new SchedulerAppReport(applications.get(appAttemptId));
} }
@Override @Override
@ -812,37 +808,30 @@ public class FairScheduler implements ResourceScheduler {
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
switch(event.getType()) { switch(event.getType()) {
case NODE_ADDED: case NODE_ADDED:
{
if (!(event instanceof NodeAddedSchedulerEvent)) { if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode()); addNode(nodeAddedEvent.getAddedRMNode());
} break;
break;
case NODE_REMOVED: case NODE_REMOVED:
{
if (!(event instanceof NodeRemovedSchedulerEvent)) { if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode()); removeNode(nodeRemovedEvent.getRemovedRMNode());
} break;
break;
case NODE_UPDATE: case NODE_UPDATE:
{
if (!(event instanceof NodeUpdateSchedulerEvent)) { if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
NodeUpdateSchedulerEvent nodeUpdatedEvent = NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event; (NodeUpdateSchedulerEvent)event;
this.nodeUpdate(nodeUpdatedEvent.getRMNode(), nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(), nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers()); nodeUpdatedEvent.getCompletedContainers());
} break;
break;
case APP_ADDED: case APP_ADDED:
{
if (!(event instanceof AppAddedSchedulerEvent)) { if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
@ -857,20 +846,16 @@ public class FairScheduler implements ResourceScheduler {
addApplication(appAddedEvent.getApplicationAttemptId(), queue, addApplication(appAddedEvent.getApplicationAttemptId(), queue,
appAddedEvent.getUser()); appAddedEvent.getUser());
} break;
break;
case APP_REMOVED: case APP_REMOVED:
{
if (!(event instanceof AppRemovedSchedulerEvent)) { if (!(event instanceof AppRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event; AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
this.removeApplication(appRemovedEvent.getApplicationAttemptID(), removeApplication(appRemovedEvent.getApplicationAttemptID(),
appRemovedEvent.getFinalAttemptState()); appRemovedEvent.getFinalAttemptState());
} break;
break;
case CONTAINER_EXPIRED: case CONTAINER_EXPIRED:
{
if (!(event instanceof ContainerExpiredSchedulerEvent)) { if (!(event instanceof ContainerExpiredSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
@ -882,8 +867,7 @@ public class FairScheduler implements ResourceScheduler {
containerId, containerId,
SchedulerUtils.EXPIRED_CONTAINER), SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE); RMContainerEventType.EXPIRE);
} break;
break;
default: default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
} }
@ -897,9 +881,9 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public synchronized void public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException { reinitialize(Configuration conf, RMContext rmContext) throws IOException {
if (!this.initialized) { if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf); this.conf = new FairSchedulerConfiguration(conf);
this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext; this.rmContext = rmContext;
this.clock = new SystemClock(); this.clock = new SystemClock();
this.eventLog = new FairSchedulerEventLog(); this.eventLog = new FairSchedulerEventLog();
@ -973,7 +957,7 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public int getNumClusterNodes() { public int getNumClusterNodes() {
return this.nodes.size(); return nodes.size();
} }
} }

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.conf.Configured;
*/ */
@Private @Private
@Unstable @Unstable
public class NewJobWeightBooster extends Configured implements WeightAdjuster { public class NewAppWeightBooster extends Configured implements WeightAdjuster {
private static final float DEFAULT_FACTOR = 3; private static final float DEFAULT_FACTOR = 3;
private static final long DEFAULT_DURATION = 5 * 60 * 1000; private static final long DEFAULT_DURATION = 5 * 60 * 1000;

View File

@ -202,7 +202,7 @@ public class QueueManager {
* Get the queue for a given AppSchedulable. * Get the queue for a given AppSchedulable.
*/ */
public FSQueue getQueueForApp(AppSchedulable app) { public FSQueue getQueueForApp(AppSchedulable app) {
return this.getQueue(app.getApp().getQueueName()); return getQueue(app.getApp().getQueueName());
} }
/** /**
@ -388,7 +388,7 @@ public class QueueManager {
// Commit the reload; also create any queue defined in the alloc file // Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI. // if it does not already exist, so it can be displayed on the web UI.
synchronized(this) { synchronized (this) {
setMinResources(minQueueResources); setMinResources(minQueueResources);
setMaxResources(maxQueueResources); setMaxResources(maxQueueResources);
setQueueMaxApps(queueMaxApps); setQueueMaxApps(queueMaxApps);
@ -431,14 +431,14 @@ public class QueueManager {
synchronized(minQueueResourcesMO) { synchronized(minQueueResourcesMO) {
if (minQueueResources.containsKey(queue)) { if (minQueueResources.containsKey(queue)) {
return minQueueResources.get(queue); return minQueueResources.get(queue);
} else{ } else {
return Resources.createResource(0); return Resources.createResource(0);
} }
} }
} }
private void setMinResources(Map<String, Resource> resources) { private void setMinResources(Map<String, Resource> resources) {
synchronized(minQueueResourcesMO) { synchronized (minQueueResourcesMO) {
minQueueResources = resources; minQueueResources = resources;
} }
} }
@ -457,7 +457,7 @@ public class QueueManager {
} }
private void setMaxResources(Map<String, Resource> resources) { private void setMaxResources(Map<String, Resource> resources) {
synchronized(maxQueueResourcesMO) { synchronized (maxQueueResourcesMO) {
maxQueueResources = resources; maxQueueResources = resources;
} }
} }
@ -472,8 +472,8 @@ public class QueueManager {
/** /**
* Remove an app * Remove an app
*/ */
public synchronized void removeJob(FSSchedulerApp app) { public synchronized void removeApp(FSSchedulerApp app) {
getQueue(app.getQueueName()).removeJob(app); getQueue(app.getQueueName()).removeApp(app);
} }
/** /**
@ -543,7 +543,7 @@ public class QueueManager {
} }
private int getQueueMaxAppsDefault(){ private int getQueueMaxAppsDefault(){
synchronized(queueMaxAppsDefaultMO) { synchronized (queueMaxAppsDefaultMO) {
return queueMaxAppsDefault; return queueMaxAppsDefault;
} }
} }
@ -575,11 +575,12 @@ public class QueueManager {
queueWeights = weights; queueWeights = weights;
} }
} }
/** /**
* Get a queue's min share preemption timeout, in milliseconds. This is the * Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they * time after which jobs in the queue may kill other queues' tasks if they
* are below their min share. * are below their min share.
*/ */
public long getMinSharePreemptionTimeout(String queueName) { public long getMinSharePreemptionTimeout(String queueName) {
synchronized (minSharePreemptionTimeoutsMO) { synchronized (minSharePreemptionTimeoutsMO) {
if (minSharePreemptionTimeouts.containsKey(queueName)) { if (minSharePreemptionTimeouts.containsKey(queueName)) {

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurable;
/** /**
* A pluggable object for altering the weights of apps in the fair scheduler, * A pluggable object for altering the weights of apps in the fair scheduler,
* which is used for example by {@link NewJobWeightBooster} to give higher * which is used for example by {@link NewAppWeightBooster} to give higher
* weight to new jobs so that short jobs finish faster. * weight to new jobs so that short jobs finish faster.
* *
* May implement {@link Configurable} to access configuration parameters. * May implement {@link Configurable} to access configuration parameters.