Merge -r 1361019:1361020 from trunk to branch. FIXES: MAPREDUCE-3451

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1361023 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-07-13 00:47:53 +00:00
parent 0d9ef3c516
commit d9fc2b7f8e
24 changed files with 4495 additions and 4 deletions

View File

@ -6,6 +6,8 @@ Release 2.0.1-alpha - UNRELEASED
NEW FEATURES
MAPREDUCE-3451. Port Fair Scheduler to MR2 (pwendell via tucu)
IMPROVEMENTS
MAPREDUCE-4146. Support limits on task status string length and number of

View File

@ -91,6 +91,16 @@ public class Resources {
public static Resource multiply(Resource lhs, int by) {
return multiplyTo(clone(lhs), by);
}
/**
* Mutliply a resource by a {@code double}. Note that integral
* resource quantites are subject to rounding during cast.
*/
public static Resource multiply(Resource lhs, double by) {
Resource out = clone(lhs);
out.setMemory((int) (lhs.getMemory() * by));
return out;
}
public static boolean equals(Resource lhs, Resource rhs) {
return lhs.getMemory() == rhs.getMemory();

View File

@ -26,6 +26,14 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
/**
* Represents the ResourceManager's view of an application container. See
* {@link RMContainerImpl} for an implementation. Containers may be in one
* of several states, given in {@link RMContainerState}. An RMContainer
* instance may exist even if there is no actual running container, such as
* when resources are being reserved to fill space for a future container
* allocation.
*/
public interface RMContainer extends EventHandler<RMContainerEvent> {
ContainerId getContainerId();

View File

@ -20,10 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
public enum RMContainerEventType {
// Source: scheduler
START,
// Source: SchedulerApp
START,
ACQUIRED,
KILL, // Also from Node on NodeRemoval
RESERVED,

View File

@ -45,6 +45,9 @@ public class SchedulerUtils {
public static final String LOST_CONTAINER =
"Container released on a *lost* node";
public static final String PREEMPTED_CONTAINER =
"Container preempted by scheduler";
public static final String COMPLETED_APPLICATION =
"Container of a completed application";

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Thrown when the allocation file for {@link QueueManager} is malformed.
*/
@Private
@Unstable
public class AllocationConfigurationException extends Exception {
private static final long serialVersionUID = 4046517047810854249L;
public AllocationConfigurationException(String message) {
super(message);
}
}

View File

@ -0,0 +1,337 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@Private
@Unstable
public class AppSchedulable extends Schedulable {
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
private boolean runnable = false; // everyone starts as not runnable
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSQueue queue;
private ContainerTokenSecretManager containerTokenSecretManager;
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = System.currentTimeMillis();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
}
@Override
public String getName() {
return app.getApplicationId().toString();
}
public SchedulerApp getApp() {
return app;
}
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, app.getCurrentConsumption());
// Add up outstanding resource requests
for (Priority p : app.getPriorities()) {
for (ResourceRequest r : app.getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public void redistributeShare() {}
@Override
public Resource getResourceUsage() {
return this.app.getCurrentConsumption();
}
@Override
public Resource getMinShare() {
return Resources.createResource(0);
}
/**
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return this.queue.getQueueSchedulable().getMetrics();
}
@Override
public double getWeight() {
return scheduler.getAppWeight(this);
}
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
/**
* Is this application runnable? Runnable means that the user and queue
* application counts are within configured quotas.
*/
public boolean getRunnable() {
return runnable;
}
public void setRunnable(boolean runnable) {
this.runnable = runnable;
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(SchedulerApp application, SchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
ContainerToken containerToken = null;
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
containerToken =
containerTokenSecretManager.createContainerToken(containerId, nodeId,
capability);
if (containerToken == null) {
return null; // Try again later.
}
}
// Create the container
Container container = BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability, priority,
containerToken);
return container;
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link SchedulerNode} and {@link SchedulerApp} classes.
*/
private void reserve(SchedulerApp application, Priority priority,
SchedulerNode node, Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getHostName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(application.getUser(), container.getResource());
RMContainer rmContainer = application.reserve(node, priority, null,
container);
node.reserveResource(application, priority, rmContainer);
getMetrics().reserveResource(this.app.getUser(),
container.getResource());
scheduler.getRootQueueMetrics().reserveResource(this.app.getUser(),
container.getResource());
}
else {
RMContainer rmContainer = node.getReservedContainer();
application.reserve(node, priority, rmContainer, container);
node.reserveResource(application, priority, rmContainer);
}
}
/**
* Remove the reservation on {@code node} for {@ application} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
private void unreserve(SchedulerApp application, Priority priority,
SchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
application.unreserve(node, priority);
node.unreserveResource(application);
getMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
scheduler.getRootQueueMetrics().unreserveResource(
application.getUser(), rmContainer.getContainer().getResource());
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*/
private Resource assignContainer(SchedulerNode node,
SchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(application, node, capability, priority);
}
// Can we allocate a container on this node?
int availableContainers =
available.getMemory() / capability.getMemory();
if (availableContainers > 0) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
return Resources.none();
}
else {
// TODO this should subtract resource just assigned
// TEMPROARY
getMetrics().setAvailableResourcesToQueue(
this.scheduler.getClusterCapacity());
}
// If we had previously made a reservation, delete it
if (reserved) {
this.unreserve(application, priority, node);
}
// Inform the node
node.allocateContainer(application.getApplicationId(),
allocatedContainer);
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(application, priority, node, container, reserved);
return Resources.none();
}
}
@Override
public Resource assignContainer(SchedulerNode node, boolean reserved) {
LOG.info("Node offered to app: " + getName() + " reserved: " + reserved);
if (reserved) {
RMContainer rmContainer = node.getReservedContainer();
Priority priority = rmContainer.getReservedPriority();
// Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) {
this.unreserve(app, priority, node);
return Resources.none();
}
} else {
// If this app is over quota, don't schedule anything
if (!(getRunnable())) { return Resources.none(); }
}
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
for (Priority priority : app.getPriorities()) {
app.addSchedulingOpportunity(priority);
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getHostName());
if (localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, app, priority,
localRequest, NodeType.NODE_LOCAL, reserved);
}
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, app, priority, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
RMNode.ANY);
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, app, priority, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
return Resources.none();
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
/**
* A queue containing several applications.
*/
@Private
@Unstable
public class FSQueue {
/** Queue name. */
private String name;
/** Applications in this specific queue; does not include children queues' jobs. */
private Collection<SchedulerApp> applications = new ArrayList<SchedulerApp>();
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
private SchedulingMode schedulingMode;
private FairScheduler scheduler;
private FSQueueSchedulable queueSchedulable;
public FSQueue(FairScheduler scheduler, String name) {
this.name = name;
this.queueSchedulable = new FSQueueSchedulable(scheduler, this);
this.scheduler = scheduler;
}
public Collection<SchedulerApp> getApplications() {
return applications;
}
public void addApp(FSSchedulerApp app) {
applications.add(app);
queueSchedulable.addApp(new AppSchedulable(scheduler, app, this));
}
public void removeJob(SchedulerApp app) {
applications.remove(app);
queueSchedulable.removeApp(app);
}
public String getName() {
return name;
}
public SchedulingMode getSchedulingMode() {
return schedulingMode;
}
public void setSchedulingMode(SchedulingMode schedulingMode) {
this.schedulingMode = schedulingMode;
}
public FSQueueSchedulable getQueueSchedulable() {
return queueSchedulable;
}
}

View File

@ -0,0 +1,285 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@Private
@Unstable
public class FSQueueSchedulable extends Schedulable implements Queue {
public static final Log LOG = LogFactory.getLog(
FSQueueSchedulable.class.getName());
private FairScheduler scheduler;
private FSQueue queue;
private QueueManager queueMgr;
private List<AppSchedulable> appScheds = new LinkedList<AppSchedulable>();
private Resource demand = Resources.createResource(0);
private QueueMetrics metrics;
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// Variables used for preemption
long lastTimeAtMinShare;
long lastTimeAtHalfFairShare;
public FSQueueSchedulable(FairScheduler scheduler, FSQueue queue) {
this.scheduler = scheduler;
this.queue = queue;
this.queueMgr = scheduler.getQueueManager();
this.metrics = QueueMetrics.forQueue(this.getName(), null, true, scheduler.getConf());
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
public void addApp(AppSchedulable app) {
appScheds.add(app);
}
public void removeApp(SchedulerApp app) {
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
AppSchedulable appSched = it.next();
if (appSched.getApp() == app) {
it.remove();
break;
}
}
}
/**
* Update demand by asking apps in the queue to update
*/
@Override
public void updateDemand() {
demand = Resources.createResource(0);
for (AppSchedulable sched: appScheds) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
LOG.debug("Counting resource from " + sched.getName() + " " + toAdd.toString());
LOG.debug("Total resource consumption for " + this.getName() + " now " + demand.toString());
demand = Resources.add(demand, toAdd);
}
// if demand exceeds the cap for this queue, limit to the max
Resource maxRes = queueMgr.getMaxResources(queue.getName());
if(Resources.greaterThan(demand, maxRes)) {
demand = maxRes;
}
}
/**
* Distribute the queue's fair share among its jobs
*/
@Override
public void redistributeShare() {
if (queue.getSchedulingMode() == SchedulingMode.FAIR) {
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
} else {
for (AppSchedulable sched: appScheds) {
sched.setFairShare(Resources.createResource(0));
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getMinShare() {
return queueMgr.getMinResources(queue.getName());
}
@Override
public double getWeight() {
return queueMgr.getQueueWeight(queue.getName());
}
@Override
public long getStartTime() {
return 0;
}
@Override
public Resource assignContainer(SchedulerNode node, boolean reserved) {
LOG.debug("Node offered to queue: " + this.getName() + " reserved: " + reserved);
// If this queue is over its limit, reject
if (Resources.greaterThan(this.getResourceUsage(),
queueMgr.getMaxResources(queue.getName()))) {
return Resources.none();
}
// If this node already has reserved resources for an app, first try to
// finish allocating resources for that app.
if (reserved) {
for (AppSchedulable sched : appScheds) {
if (sched.getApp().getApplicationAttemptId() ==
node.getReservedContainer().getApplicationAttemptId()) {
return sched.assignContainer(node, reserved);
}
}
return Resources.none(); // We should never get here
}
// Otherwise, chose app to schedule based on given policy (fair vs fifo).
else {
SchedulingMode mode = queue.getSchedulingMode();
Comparator<Schedulable> comparator;
if (mode == SchedulingMode.FIFO) {
comparator = new SchedulingAlgorithms.FifoComparator();
} else if (mode == SchedulingMode.FAIR) {
comparator = new SchedulingAlgorithms.FairShareComparator();
} else {
throw new RuntimeException("Unsupported queue scheduling mode " + mode);
}
Collections.sort(appScheds, comparator);
for (AppSchedulable sched: appScheds) {
return sched.assignContainer(node, reserved);
}
return Resources.none();
}
}
@Override
public String getName() {
return queue.getName();
}
FSQueue getQueue() {
return queue;
}
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
}
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
@Override
public Priority getPriority() {
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
@Override
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName());
return new HashMap<QueueACL, AccessControlList>(acls);
}
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
Map<QueueACL, AccessControlList> acls = this.queueMgr.getQueueAcls(this.getName());
if (acls.get(operation).isUserAllowed(user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo);
}
@Override
public String getQueueName() {
return getName();
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* This class extends the application lifecycle management contained with
* the {@link SchedulerApp} class and adds delay-scheduling information
* specific to the FairScheduler.
*/
public class FSSchedulerApp extends SchedulerApp {
private static final Log LOG = LogFactory.getLog(SchedulerApp.class);
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this
* we first only allow node-local assigments for a given prioirty level,
* then relax the locality threshold once we've had a long enough period
* without succesfully scheduling. We measure both the number of "missed"
* scheduling opportunities since the last container was scheduled
* at the current allowed level and the time since the last container
* was scheduled. Currently we use only the former.
*/
// Current locality threshold
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
Priority, NodeType>();
// Time of the last container scheduled at the current allowed level
Map<Priority, Long> lastScheduledContainer = new HashMap<Priority, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, ApplicationStore store) {
super(applicationAttemptId, user, queue, activeUsersManager,
rmContext, store);
}
/**
* Should be called when an application has successfully scheduled a container,
* or when the scheduling locality threshold is relaxed.
* Reset various internal counters which affect delay scheduling
*
* @param priority The priority of the container scheduled.
*/
synchronized public void resetSchedulingOpportunities(Priority priority) {
this.lastScheduledContainer.put(priority, System.currentTimeMillis());
super.resetSchedulingOpportunities(priority);
}
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to
* fail at (as a fraction of cluster size) before relaxing scheduling
* constraints.
*/
public synchronized NodeType getAllowedLocalityLevel(Priority priority,
int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (this.getSchedulingOpportunities(priority) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
this.resetSchedulingOpportunities(priority);
}
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
this.resetSchedulingOpportunities(priority);
}
}
return allowedLocalityLevel.get(priority);
}
synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
NodeType allowed = this.allowedLocalityLevel.get(priority);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) ||
type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(priority, type);
}
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(priority, type);
}
}
return super.allocate(type, node, priority, request, container);
}
/**
* Should be called when the scheduler assigns a container at a higher
* degree of locality than the current threshold. Reset the allowed locality
* level to a higher degree of locality.
*/
public synchronized void resetAllowedLocalityLevel(Priority priority,
NodeType level) {
NodeType old = allowedLocalityLevel.get(priority);
LOG.info("Raising locality level from " + old + " to " + level + " at " +
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
}

View File

@ -0,0 +1,981 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
public class FairScheduler implements ResourceScheduler {
private boolean initialized;
private FairSchedulerConfiguration conf;
private ContainerTokenSecretManager containerTokenSecretManager;
private RMContext rmContext;
private Resource minimumAllocation;
private Resource maximumAllocation;
private QueueManager queueMgr;
private Clock clock;
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
// Whether to use username in place of "default" queue name
private boolean userAsDefaultQueue = false;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
private static final Allocation EMPTY_ALLOCATION =
new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
// Aggregate metrics
QueueMetrics rootMetrics;
//Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
//Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
protected Map<ApplicationAttemptId, SchedulerApp> applications
= new HashMap<ApplicationAttemptId, SchedulerApp>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, SchedulerNode> nodes =
new ConcurrentHashMap<NodeId, SchedulerNode>();
// Aggregate capacity of the cluster
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
protected long preemptionInterval = 15000;
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
public FairSchedulerConfiguration getConf() {
return this.conf;
}
public QueueManager getQueueManager() {
return this.queueMgr;
}
public List<FSQueueSchedulable> getQueueSchedulables() {
List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
for (FSQueue queue: queueMgr.getQueues()) {
scheds.add(queue.getQueueSchedulable());
}
return scheds;
}
private RMContainer getRMContainer(ContainerId containerId) {
SchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
/**
* A runnable which calls {@link FairScheduler#update()} every
* <code>UPDATE_INTERVAL</code> milliseconds.
*/
private class UpdateThread implements Runnable {
public void run() {
while (initialized) {
try {
Thread.sleep(UPDATE_INTERVAL);
update();
preemptTasksIfNecessary();
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
}
}
}
/**
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
protected void update() {
synchronized (this) {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
// Update demands of apps and queues
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().updateDemand();
}
// Compute fair shares based on updated demands
List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables();
SchedulingAlgorithms.computeFairShares(
queueScheds, clusterCapacity);
// Update queue metrics for this queue
for (FSQueueSchedulable sched : queueScheds) {
sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
}
// Use the computed shares to assign shares within each queue
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().redistributeShare();
}
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
}
}
/**
* Update the preemption fields for all QueueScheduables, i.e. the times since
* each queue last was at its guaranteed share and at > 1/2 of its fair share
* for each type of task.
*/
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
for (FSQueueSchedulable sched: getQueueSchedulables()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
if (!isStarvedForFairShare(sched)) {
sched.setLastTimeAtHalfFairShare(now);
}
}
}
/**
* Is a queue below its min share for the given task type?
*/
boolean isStarvedForMinShare(FSQueueSchedulable sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
}
/**
* Is a queue being starved for fair share for the given task type?
* This is defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSQueueSchedulable sched) {
Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
}
/**
* Check for queues that need tasks preempted, either because they have been
* below their guaranteed share for minSharePreemptionTimeout or they
* have been below half their fair share for the fairSharePreemptionTimeout.
* If such queues exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks.
*
* This method computes and logs the number of tasks we want to preempt even
* if preemption is disabled, for debugging purposes.
*/
protected void preemptTasksIfNecessary() {
if (!preemptionEnabled)
return;
long curTime = clock.getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval)
return;
lastPreemptCheckTime = curTime;
Resource resToPreempt = Resources.none();
for (FSQueueSchedulable sched: getQueueSchedulables()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(resToPreempt, Resources.none())) {
preemptResources(getQueueSchedulables(), resToPreempt);
}
}
/**
* Preempt a quantity of resources from a list of QueueSchedulables.
* The policy for this is to pick apps from queues that are over their fair
* share, but make sure that no queue is placed below its fair share in the
* process. We further prioritize preemption by choosing containers with
* lowest priority to preempt.
*/
protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
return;
Map<RMContainer, SchedulerApp> apps = new HashMap<RMContainer, SchedulerApp>();
Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
for (FSQueueSchedulable sched: scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as: sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
queues.put(c, sched);
}
}
}
}
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
return c2.getContainer().getPriority().compareTo(
c1.getContainer().getPriority());
}
});
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
for (RMContainer container: runningContainers) {
FSQueueSchedulable sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + sched.getQueue().getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
this.completedContainer(container, status, RMContainerEventType.KILL);
toPreempt = Resources.subtract(toPreempt,
container.getContainer().getResource());
if (Resources.equals(toPreempt, Resources.none())) {
break;
}
}
}
}
/**
* Return the resource amount that this queue is allowed to preempt, if any.
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
* fairSharePreemptionTimeout, it should preempt enough tasks to get up to
* its full fair share. If both conditions hold, we preempt the max of the
* two amounts (this shouldn't happen unless someone sets the timeouts to
* be identical for some reason).
*/
protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(),
Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(Resources.none(),
Resources.subtract(target, sched.getResourceUsage()));
}
Resource resToPreempt = Resources.max(resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message);
}
return resToPreempt;
}
/**
* This updates the runnability of all apps based on whether or not
* any users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
for (FSQueue p: queueMgr.getQueues()) {
for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
}
// Create a list of sorted jobs in order of start time and priority
Collections.sort(apps, new FifoAppComparator());
// Mark jobs as runnable in order of start time and priority, until
// user or queue limits have been reached.
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app: apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
int queueCount = queueApps.containsKey(queue) ? queueApps.get(queue) : 0;
if (userCount < queueMgr.getUserMaxApps(user) &&
queueCount < queueMgr.getQueueMaxApps(queue)) {
userApps.put(user, userCount + 1);
queueApps.put(queue, queueCount + 1);
app.setRunnable(true);
}
}
}
public ContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
}
public double getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current demand
weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
}
weight *= app.getPriority().getPriority();
if (weightAdjuster != null) {
// Run weight through the user-supplied weightAdjuster
weight = weightAdjuster.adjustWeight(app, weight);
}
return weight;
}
}
@Override
public Resource getMinimumResourceCapability() {
return this.minimumAllocation;
}
@Override
public Resource getMaximumResourceCapability() {
return this.maximumAllocation;
}
public double getNodeLocalityThreshold() {
return this.nodeLocalityThreshold;
}
public double getRackLocalityThreshold() {
return this.rackLocalityThreshold;
}
public Resource getClusterCapacity() {
return this.clusterCapacity;
}
public Clock getClock() {
return this.clock;
}
protected void setClock(Clock clock) {
this.clock = clock;
}
public FairSchedulerEventLog getEventLog() {
return eventLog;
}
/**
* Add a new application to the scheduler, with a given id, queue name,
* and user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void
addApplication(ApplicationAttemptId applicationAttemptId,
String queueName, String user) {
FSQueue queue = this.queueMgr.getQueue(queueName);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()),
rmContext, null);
// Inforce ACLs
UserGroupInformation userUgi;
try {
userUgi = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
LOG.info("Failed to get current user information");
return;
}
List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
userUgi); // Always a signleton list
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName());
return;
}
queue.addApp(schedulerApp);
queue.getQueueSchedulable().getMetrics().submitApp(user,
applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
", user: " + user +
", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(applicationAttemptId,
RMAppAttemptEventType.APP_ACCEPTED));
}
private synchronized void removeApplication(
ApplicationAttemptId applicationAttemptId,
RMAppAttemptState rmAppAttemptFinalState) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
return;
}
// Release all the running containers
for (RMContainer rmContainer : application.getLiveContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
application.stop(rmAppAttemptFinalState);
// Inform the queue
FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName());
queue.removeJob(application);
// Remove from our data-structure
applications.remove(applicationAttemptId);
}
/**
* Clean up a completed container.
*/
private synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
return;
}
Container container = rmContainer.getContainer();
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
SchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Container " + container + " of" +
" unknown application " + applicationAttemptId +
" completed with event " + event);
return;
}
// Get the node on which the container was allocated
SchedulerNode node = nodes.get(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority());
node.unreserveResource(application);
} else {
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
}
LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +
" on node: " + node +
" with event: " + event);
}
private synchronized void addNode(RMNode node) {
this.nodes.put(node.getNodeID(), new SchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability());
LOG.info("Added node " + node.getNodeAddress() +
" cluster capacity: " + clusterCapacity);
}
private synchronized void removeNode(RMNode rmNode) {
SchedulerNode node = this.nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
// Remove reservations, if any
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
completedContainer(reservedContainer,
SchedulerUtils.createAbnormalContainerStatus(
reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
this.nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterCapacity);
}
@Override
public Allocation allocate(ApplicationAttemptId appAttemptId,
List<ResourceRequest> ask, List<ContainerId> release) {
// Make sure this application exists
SchedulerApp application = applications.get(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
return EMPTY_ALLOCATION;
}
// Sanity check
SchedulerUtils.normalizeRequests(ask, minimumAllocation.getMemory());
// Release containers
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FairScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainerId,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
synchronized (application) {
if (!ask.isEmpty()) {
if(LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId());
}
application.showRequests();
// Update application requests
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update");
application.showRequests();
}
if(LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
}
return new Allocation(
application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
/**
* Process a container which has launched on a node, as reported by the
* node.
*/
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
SchedulerApp application = applications.get(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + containerId +
" on node: " + node);
return;
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
/**
* Process a heartbeat update from a node.
*/
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
eventLog.log("HEARTBEAT", nm.getHostName());
SchedulerNode node = nodes.get(nm.getNodeID());
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
// If we have have an application that has reserved a resource on this node
// already, we try to complete the reservation.
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
SchedulerApp reservedApplication =
applications.get(reservedContainer.getApplicationAttemptId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
queue.getQueueSchedulable().assignContainer(node, true);
}
// Otherwise, schedule at queue which is furthest below fair share
else {
int assignedContainers = 0;
while (true) {
// At most one task is scheduled each iteration of this loop
List<FSQueueSchedulable> scheds = this.getQueueSchedulables();
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false;
for (FSQueueSchedulable sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
assignedContainers++;
assignedContainer = true;
break;
}
}
if (!assignedContainer) { break; }
if (!assignMultiple) { break; }
if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
}
}
}
@Override
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
SchedulerNode node = nodes.get(nodeId);
return node == null ? null : new SchedulerNodeReport(node);
}
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
if (!this.applications.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
return new SchedulerAppReport(this.applications.get(appAttemptId));
}
@Override
public QueueMetrics getRootQueueMetrics() {
return rootMetrics;
}
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
case NODE_ADDED:
{
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
{
if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
}
break;
case NODE_UPDATE:
{
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
this.nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
}
break;
case APP_ADDED:
{
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so
String def = YarnConfiguration.DEFAULT_QUEUE_NAME;
if (queue.equals(def) && userAsDefaultQueue) {
queue = appAddedEvent.getUser();
}
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
appAddedEvent.getUser());
}
break;
case APP_REMOVED:
{
if (!(event instanceof AppRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
this.removeApplication(appRemovedEvent.getApplicationAttemptID(),
appRemovedEvent.getFinalAttemptState());
}
break;
case CONTAINER_EXPIRED:
{
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
containerId,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
}
break;
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}
}
@Override
public void recover(RMState state) throws Exception {
// NOT IMPLEMENTED
}
@Override
public synchronized void reinitialize(Configuration conf,
ContainerTokenSecretManager containerTokenSecretManager,
RMContext rmContext)
throws IOException
{
if (!this.initialized) {
this.conf = new FairSchedulerConfiguration(conf);
this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.containerTokenSecretManager = containerTokenSecretManager;
this.rmContext = rmContext;
this.clock = new SystemClock();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
maximumAllocation = this.conf.getMaximumMemoryAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
Thread updateThread = new Thread(new UpdateThread());
updateThread.start();
initialized = true;
sizeBasedWeight = this.conf.getSizeBasedWeight();
queueMgr = new QueueManager(this);
try {
queueMgr.initialize();
}
catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
} else {
this.conf = new FairSchedulerConfiguration(conf);
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try {
queueMgr.reloadAllocs();
}
catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
}
@Override
public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
boolean recursive) throws IOException {
if (!queueMgr.exists(queueName)) {
return null;
}
return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
includeChildQueues, recursive);
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
UserGroupInformation user = null;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
return new ArrayList<QueueUserACLInfo>();
}
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
for (FSQueue queue : queueMgr.getQueues()) {
userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
}
return userAcls;
}
@Override
public int getNumClusterNodes() {
return this.nodes.size();
}
}

View File

@ -0,0 +1,111 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class FairSchedulerConfiguration extends Configuration {
public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
private static final String CONF_PREFIX = "yarn.scheduler.fair.";
protected static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
protected static final String EVENT_LOG_DIR = "eventlog.dir";
/** Whether to use the user name as the queue name (instead of "default") if
* the request does not specify a queue. */
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
/** Cluster threshold for node locality. */
protected static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX + "locality.threshold.node";
protected static final float DEFAULT_LOCALITY_THRESHOLD_NODE =
DEFAULT_LOCALITY_THRESHOLD;
/** Cluster threshold for rack locality. */
protected static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX + "locality.threshold.rack";
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
DEFAULT_LOCALITY_THRESHOLD;
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
/** Whether to assign multiple containers in one check-in. */
protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = true;
/** Whether to give more weight to apps requiring many resources. */
protected static final String SIZE_BASED_WEIGHT = CONF_PREFIX + "sizebasedweight";
protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
/** Maximum number of containers to assign on each check-in. */
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
protected static final int DEFAULT_MAX_ASSIGN = -1;
public FairSchedulerConfiguration(Configuration conf) {
super(conf);
addResource(FS_CONFIGURATION_FILE);
}
public Resource getMinimumMemoryAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
return Resources.createResource(mem);
}
public Resource getMaximumMemoryAllocation() {
int mem = getInt(
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
return Resources.createResource(mem);
}
public boolean getUserAsDefaultQueue() {
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
}
public float getLocalityThreshold() {
return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
}
public float getLocalityThresholdNode() {
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
}
public float getLocalityThresholdRack() {
return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
}
public boolean getPreemptionEnabled() {
return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
}
public boolean getAssignMultiple() {
return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
}
public int getMaxAssign() {
return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
}
public boolean getSizeBasedWeight() {
return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
}
public String getAllocationFile() {
return get(ALLOCATION_FILE);
}
public String getEventlogDir() {
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.DailyRollingFileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.spi.LoggingEvent;
/**
* Event log used by the fair scheduler for machine-readable debug info.
* This class uses a log4j rolling file appender to write the log, but uses
* a custom tab-separated event format of the form:
* <pre>
* DATE EVENT_TYPE PARAM_1 PARAM_2 ...
* </pre>
* Various event types are used by the fair scheduler. The purpose of logging
* in this format is to enable tools to parse the history log easily and read
* internal scheduler variables, rather than trying to make the log human
* readable. The fair scheduler also logs human readable messages in the
* JobTracker's main log.
*
* Constructing this class creates a disabled log. It must be initialized
* using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
* writing to the file.
*/
@Private
@Unstable
class FairSchedulerEventLog {
private static final Log LOG = LogFactory.getLog(FairSchedulerEventLog.class.getName());
/** Set to true if logging is disabled due to an error. */
private boolean logDisabled = true;
/**
* Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
* defaults to {hadoop.log.dir}/fairscheduler.
*/
private String logDir;
/**
* Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.log.
* Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
*/
private String logFile;
/** Log4j appender used to write to the log file */
private DailyRollingFileAppender appender;
boolean init(FairSchedulerConfiguration conf) {
try {
logDir = conf.getEventlogDir();
Path logDirPath = new Path(logDir);
FileSystem fs = logDirPath.getFileSystem(conf);
if (!fs.exists(logDirPath)) {
if (!fs.mkdirs(logDirPath)) {
throw new IOException(
"Mkdirs failed to create " + logDirPath.toString());
}
}
String username = System.getProperty("user.name");
logFile = String.format("%s%shadoop-%s-fairscheduler.log",
logDir, File.separator, username);
logDisabled = false;
PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
appender.activateOptions();
LOG.info("Initialized fair scheduler event log, logging to " + logFile);
} catch (IOException e) {
LOG.error(
"Failed to initialize fair scheduler event log. Disabling it.", e);
logDisabled = true;
}
return !(logDisabled);
}
/**
* Log an event, writing a line in the log file of the form
* <pre>
* DATE EVENT_TYPE PARAM_1 PARAM_2 ...
* </pre>
*/
synchronized void log(String eventType, Object... params) {
try {
if (logDisabled)
return;
StringBuffer buffer = new StringBuffer();
buffer.append(eventType);
for (Object param: params) {
buffer.append("\t");
buffer.append(param);
}
String message = buffer.toString();
Logger logger = Logger.getLogger(getClass());
appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
} catch (Exception e) {
LOG.error("Failed to append to fair scheduler event log", e);
logDisabled = true;
}
}
/**
* Flush and close the log.
*/
void shutdown() {
try {
if (appender != null)
appender.close();
} catch (Exception e) {}
logDisabled = true;
}
boolean isEnabled() {
return !logDisabled;
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Comparator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Order {@link AppSchedulable} objects by priority and then by submit time, as
* in the default scheduler in Hadoop.
*/
@Private
@Unstable
public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
private static final long serialVersionUID = 3428835083489547918L;
public int compare(AppSchedulable a1, AppSchedulable a2) {
int res = a1.getPriority().compareTo(a2.getPriority());
if (res == 0) {
if (a1.getStartTime() < a2.getStartTime()) {
res = -1;
} else {
res = (a1.getStartTime() == a2.getStartTime() ? 0 : 1);
}
}
if (res == 0) {
// If there is a tie, break it by app ID to get a deterministic order
res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
}
return res;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
/**
* A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
* for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
* This can be used to make shorter jobs finish faster, emulating Shortest Job
* First scheduling while not starving long jobs.
*/
@Private
@Unstable
public class NewJobWeightBooster extends Configured implements WeightAdjuster {
private static final float DEFAULT_FACTOR = 3;
private static final long DEFAULT_DURATION = 5 * 60 * 1000;
private float factor;
private long duration;
public void setConf(Configuration conf) {
if (conf != null) {
factor = conf.getFloat("mapred.newjobweightbooster.factor",
DEFAULT_FACTOR);
duration = conf.getLong("mapred.newjobweightbooster.duration",
DEFAULT_DURATION);
}
super.setConf(conf);
}
public double adjustWeight(AppSchedulable app, double curWeight) {
long start = app.getStartTime();
long now = System.currentTimeMillis();
if (now - start < duration) {
return curWeight * factor;
} else {
return curWeight;
}
}
}

View File

@ -0,0 +1,513 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
*/
@Private
@Unstable
public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
/**
* Time to wait after the allocation has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
private final FairScheduler scheduler;
// Minimum resource allocation for each queue
private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
// Maximum amount of resources per queue
private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
// Sharing weights for each queue
private Map<String, Double> queueWeights = new HashMap<String, Double>();
// Max concurrent running applications for each queue and for each user; in addition,
// for users that have no max specified, we use the userMaxJobsDefault.
private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
private int userMaxAppsDefault = Integer.MAX_VALUE;
private int queueMaxAppsDefault = Integer.MAX_VALUE;
// ACL's for each queue. Only specifies non-default ACL's from configuration.
private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
// Min share preemption timeout for each queue in seconds. If a job in the queue
// waits this long without receiving its guaranteed share, it is allowed to
// preempt other jobs' tasks.
private Map<String, Long> minSharePreemptionTimeouts =
new HashMap<String, Long>();
// Default min share preemption timeout for queues where it is not set
// explicitly.
private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
private Object allocFile; // Path to XML file containing allocations. This
// is either a URL to specify a classpath resource
// (if the fair-scheduler.xml on the classpath is
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf();
this.allocFile = conf.getAllocationFile();
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
// file, fair-scheduler.xml, looking for it on the classpath.
allocFile = new Configuration().getResource("fair-scheduler.xml");
if (allocFile == null) {
LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
+ "not found on the classpath, and no other config file is given "
+ "through mapred.fairscheduler.allocation.file.");
}
}
reloadAllocs();
lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
/**
* Get a queue by name, creating it if necessary
*/
public synchronized FSQueue getQueue(String name) {
FSQueue queue = queues.get(name);
if (queue == null) {
queue = new FSQueue(scheduler, name);
queue.setSchedulingMode(defaultSchedulingMode);
queues.put(name, queue);
}
return queue;
}
/**
* Return whether a queue exists already.
*/
public synchronized boolean exists(String name) {
return queues.containsKey(name);
}
/**
* Get the queue for a given AppSchedulable.
*/
public FSQueue getQueueForApp(AppSchedulable app) {
return this.getQueue(app.getApp().getQueueName());
}
/**
* Reload allocations file if it hasn't been loaded in a while
*/
public void reloadAllocsIfNecessary() {
long time = scheduler.getClock().getTime();
if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
lastReloadAttempt = time;
if (null == allocFile) {
return;
}
try {
// Get last modified time of alloc file depending whether it's a String
// (for a path name) or an URL (for a classloader resource)
long lastModified;
if (allocFile instanceof String) {
File file = new File((String) allocFile);
lastModified = file.lastModified();
} else { // allocFile is an URL
URLConnection conn = ((URL) allocFile).openConnection();
lastModified = conn.getLastModified();
}
if (lastModified > lastSuccessfulReload &&
time > lastModified + ALLOC_RELOAD_WAIT) {
reloadAllocs();
lastSuccessfulReload = time;
lastReloadAttemptFailed = false;
}
} catch (Exception e) {
// Throwing the error further out here won't help - the RPC thread
// will catch it and report it in a loop. Instead, just log it and
// hope somebody will notice from the log.
// We log the error only on the first failure so we don't fill up the
// JobTracker's log with these messages.
if (!lastReloadAttemptFailed) {
LOG.error("Failed to reload fair scheduler config file - " +
"will use existing allocations.", e);
}
lastReloadAttemptFailed = true;
}
}
}
/**
* Updates the allocation list from the allocation config file. This file is
* expected to be in the XML format specified in the design doc.
*
* @throws IOException if the config file cannot be read.
* @throws AllocationConfigurationException if allocations are invalid.
* @throws ParserConfigurationException if XML parser is misconfigured.
* @throws SAXException if config file is malformed.
*/
public void reloadAllocs() throws IOException, ParserConfigurationException,
SAXException, AllocationConfigurationException {
if (allocFile == null) return;
// Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully.
Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
Map<String, Double> queueWeights = new HashMap<String, Double>();
Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>();
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc;
if (allocFile instanceof String) {
doc = builder.parse(new File((String) allocFile));
} else {
doc = builder.parse(allocFile.toString());
}
Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName()))
throw new AllocationConfigurationException("Bad fair scheduler config " +
"file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (!(node instanceof Element))
continue;
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
String queueName = element.getAttribute("name");
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
queueNamesInAllocFile.add(queueName);
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queueModes.put(queueName, parseSchedulingMode(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
}
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
&& Resources.lessThan(maxQueueResources.get(queueName),
minQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
}
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxApps.put(userName, val);
}
}
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val;
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;}
else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
defaultSchedulingMode = parseSchedulingMode(text);
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
}
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
synchronized(this) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
this.userMaxApps = userMaxApps;
this.queueWeights = queueWeights;
this.userMaxAppsDefault = userMaxAppsDefault;
this.queueMaxAppsDefault = queueMaxAppsDefault;
this.defaultSchedulingMode = defaultSchedulingMode;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.queueAcls = queueAcls;
for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name);
if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name));
} else {
queue.setSchedulingMode(defaultSchedulingMode);
}
}
}
}
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
text = text.toLowerCase();
if (text.equals("fair")) {
return SchedulingMode.FAIR;
} else if (text.equals("fifo")) {
return SchedulingMode.FIFO;
} else {
throw new AllocationConfigurationException(
"Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
}
}
/**
* Get the minimum resource allocation for the given queue.
* @return the cap set on this queue, or 0 if not set.
*/
public Resource getMinResources(String queue) {
if (minQueueResources.containsKey(queue)) {
return minQueueResources.get(queue);
} else{
return Resources.createResource(0);
}
}
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
*/
Resource getMaxResources(String queueName) {
if (maxQueueResources.containsKey(queueName)) {
return maxQueueResources.get(queueName);
} else {
return Resources.createResource(Integer.MAX_VALUE);
}
}
/**
* Add an app in the appropriate queue
*/
public synchronized void addApp(FSSchedulerApp app) {
getQueue(app.getQueueName()).addApp(app);
}
/**
* Remove an app
*/
public synchronized void removeJob(SchedulerApp app) {
getQueue(app.getQueueName()).removeJob(app);
}
/**
* Get a collection of all queues
*/
public synchronized Collection<FSQueue> getQueues() {
return queues.values();
}
/**
* Get all queue names that have been seen either in the allocation file or in
* a submitted app.
*/
public synchronized Collection<String> getQueueNames() {
List<String> list = new ArrayList<String>();
for (FSQueue queue: getQueues()) {
list.add(queue.getName());
}
Collections.sort(list);
return list;
}
public int getUserMaxApps(String user) {
if (userMaxApps.containsKey(user)) {
return userMaxApps.get(user);
} else {
return userMaxAppsDefault;
}
}
public int getQueueMaxApps(String queue) {
if (queueMaxApps.containsKey(queue)) {
return queueMaxApps.get(queue);
} else {
return queueMaxAppsDefault;
}
}
public double getQueueWeight(String queue) {
if (queueWeights.containsKey(queue)) {
return queueWeights.get(queue);
} else {
return 1.0;
}
}
/**
* Get a queue's min share preemption timeout, in milliseconds. This is the
* time after which jobs in the queue may kill other queues' tasks if they
* are below their min share.
*/
public long getMinSharePreemptionTimeout(String queueName) {
if (minSharePreemptionTimeouts.containsKey(queueName)) {
return minSharePreemptionTimeouts.get(queueName);
}
return defaultMinSharePreemptionTimeout;
}
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
*/
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
/**
* Get the ACLs associated with this queue. If a given ACL is not explicitly
* configured, include the default value for that ACL.
*/
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
if (queueAcls.containsKey(queue)) {
out.putAll(queueAcls.get(queue));
}
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
}
if (!out.containsKey(QueueACL.SUBMIT_APPLICATIONS)) {
out.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList("*"));
}
return out;
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* A Schedulable represents an entity that can launch tasks, such as a job
* or a queue. It provides a common interface so that algorithms such as fair
* sharing can be applied both within a queue and across queues. There are
* currently two types of Schedulables: JobSchedulables, which represent a
* single job, and QueueSchedulables, which allocate among jobs in their queue.
*
* Separate sets of Schedulables are used for maps and reduces. Each queue has
* both a mapSchedulable and a reduceSchedulable, and so does each job.
*
* A Schedulable is responsible for three roles:
* 1) It can launch tasks through assignTask().
* 2) It provides information about the job/queue to the scheduler, including:
* - Demand (maximum number of tasks required)
* - Number of currently running tasks
* - Minimum share (for queues)
* - Job/queue weight (for fair sharing)
* - Start time and priority (for FIFO)
* 3) It can be assigned a fair share, for use with fair scheduling.
*
* Schedulable also contains two methods for performing scheduling computations:
* - updateDemand() is called periodically to compute the demand of the various
* jobs and queues, which may be expensive (e.g. jobs must iterate through all
* their tasks to count failed tasks, tasks that can be speculated, etc).
* - redistributeShare() is called after demands are updated and a Schedulable's
* fair share has been set by its parent to let it distribute its share among
* the other Schedulables within it (e.g. for queues that want to perform fair
* sharing among their jobs).
*/
@Private
@Unstable
abstract class Schedulable {
/** Fair share assigned to this Schedulable */
private Resource fairShare = Resources.createResource(0);
/**
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
public abstract String getName();
/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
public abstract Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
public abstract Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
public abstract Resource getMinShare();
/** Job/queue weight in fair sharing. */
public abstract double getWeight();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public abstract long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
public abstract Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
public abstract void updateDemand();
/**
* Distribute the fair share assigned to this Schedulable among its
* children (used in queues where the internal scheduler is fair sharing).
*/
public abstract void redistributeShare();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation
* already exists on this node, and the schedulable should fulfill that
* reservation if possible.
*/
public abstract Resource assignContainer(SchedulerNode node, boolean reserved);
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;
}
/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare() {
return fairShare;
}
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
}
}

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
/**
* Utility class containing scheduling algorithms used in the fair scheduler.
*/
@Private
@Unstable
class SchedulingAlgorithms {
public static final Log LOG = LogFactory.getLog(
SchedulingAlgorithms.class.getName());
/**
* Compare Schedulables in order of priority and then submission time, as in
* the default FIFO scheduler in Hadoop.
*/
public static class FifoComparator implements Comparator<Schedulable>, Serializable {
private static final long serialVersionUID = -5905036205491177060L;
@Override
public int compare(Schedulable s1, Schedulable s2) {
int res = s1.getPriority().compareTo(s2.getPriority());
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
if (res == 0) {
// In the rare case where jobs were submitted at the exact same time,
// compare them by name (which will be the JobID) to get a deterministic
// ordering, so we don't alternately launch tasks from different jobs.
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
/**
* Compare Schedulables via weighted fair sharing. In addition, Schedulables
* below their min share get priority over those whose min share is met.
*
* Schedulables below their min share are compared by how far below it they
* are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
* and job B has 50 out of a min share of 100, then job B is scheduled next,
* because B is at 50% of its min share and A is at 80% of its min share.
*
* Schedulables above their min share are compared by (runningTasks / weight).
* If all weights are equal, slots are given to the job with the fewest tasks;
* otherwise, jobs with more weight get proportionally more slots.
*/
public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
private static final long serialVersionUID = 5564969375856699313L;
@Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
Resource one = Resources.createResource(1);
minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
Resources.max(minShare1, one).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
Resources.max(minShare2, one).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else // Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}
/**
* Number of iterations for the binary search in computeFairShares. This is
* equivalent to the number of bits of precision in the output. 25 iterations
* gives precision better than 0.1 slots in clusters with one million slots.
*/
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min shares and demands of the Schedulables are assumed to
* be set beforehand. We compute the fairest possible allocation of shares
* to the Schedulables that respects their min shares and demands.
*
* To understand what this method does, we must first define what weighted
* fair sharing means in the presence of minimum shares and demands. If there
* were no minimum shares and every Schedulable had an infinite demand (i.e.
* could launch infinitely many tasks), then weighted fair sharing would be
* achieved if the ratio of slotsAssigned / weight was equal for each
* Schedulable and all slots were assigned. Minimum shares and demands add
* two further twists:
* - Some Schedulables may not have enough tasks to fill all their share.
* - Some Schedulables may have a min share higher than their assigned share.
*
* To deal with these possibilities, we define an assignment of slots as
* being fair if there exists a ratio R such that:
* - Schedulables S where S.demand < R * S.weight are assigned share S.demand
* - Schedulables S where S.minShare > R * S.weight are given share S.minShare
* - All other Schedulables S are assigned share R * S.weight
* - The sum of all the shares is totalSlots.
*
* We call R the weight-to-slots ratio because it converts a Schedulable's
* weight to the number of slots it is assigned.
*
* We compute a fair allocation by finding a suitable weight-to-slot ratio R.
* To do this, we use binary search. Given a ratio R, we compute the number
* of slots that would be used in total with this ratio (the sum of the shares
* computed using the conditions above). If this number of slots is less than
* totalSlots, then R is too small and more slots could be assigned. If the
* number of slots is more than totalSlots, then R is too large.
*
* We begin the binary search with a lower bound on R of 0 (which means that
* all Schedulables are only given their minShare) and an upper bound computed
* to be large enough that too many slots are given (by doubling R until we
* either use more than totalSlots slots or we fulfill all jobs' demands).
* The helper method slotsUsedWithWeightToSlotRatio computes the total number
* of slots used with a given value of R.
*
* The running time of this algorithm is linear in the number of Schedulables,
* because slotsUsedWithWeightToSlotRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
public static void computeFairShares(
Collection<? extends Schedulable> schedulables, Resource totalResources) {
// Find an upper bound on R that we can use in our binary search. We start
// at R = 1 and double it until we have either used totalSlots slots or we
// have met all Schedulables' demands (if total demand < totalSlots).
Resource totalDemand = Resources.createResource(0);
for (Schedulable sched: schedulables) {
Resources.addTo(totalDemand, sched.getDemand());
}
Resource cap = Resources.min(totalDemand, totalResources);
double rMax = 1.0;
while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
rMax *= 2.0;
}
// Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
double left = 0;
double right = rMax;
for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
double mid = (left + right) / 2.0;
if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
left = mid;
} else {
right = mid;
}
}
// Set the fair shares based on the value of R we've converged to
for (Schedulable sched: schedulables) {
sched.setFairShare(computeShare(sched, right));
}
}
/**
* Compute the number of slots that would be used given a weight-to-slot
* ratio w2sRatio, for use in the computeFairShares algorithm as described
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource resUsedWithWeightToResRatio(double w2sRatio,
Collection<? extends Schedulable> schedulables) {
Resource slotsTaken = Resources.createResource(0);
for (Schedulable sched: schedulables) {
Resource share = computeShare(sched, w2sRatio);
Resources.addTo(slotsTaken, share);
}
return slotsTaken;
}
/**
* Compute the resources assigned to a Schedulable given a particular
* res-to-slot ratio r2sRatio, for use in computeFairShares as described
* in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
*/
private static Resource computeShare(Schedulable sched, double r2sRatio) {
double share = sched.getWeight() * r2sRatio;
share = Math.max(share, sched.getMinShare().getMemory());
share = Math.min(share, sched.getDemand().getMemory());
return Resources.createResource((int) share);
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Internal scheduling modes for queues.
*/
@Private
@Unstable
public enum SchedulingMode {
FAIR, FIFO
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
/**
* A pluggable object for altering the weights of apps in the fair scheduler,
* which is used for example by {@link NewJobWeightBooster} to give higher
* weight to new jobs so that short jobs finish faster.
*
* May implement {@link Configurable} to access configuration parameters.
*/
@Private
@Unstable
public interface WeightAdjuster {
public double adjustWeight(AppSchedulable app, double curWeight);
}

View File

@ -18,14 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.QUEUE_NAME;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.QUEUE_NAME;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.Controller;
import org.apache.hadoop.yarn.webapp.WebAppException;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import com.google.inject.Inject;
@ -71,6 +74,12 @@ public class RmController extends Controller {
render(CapacitySchedulerPage.class);
return;
}
if (rs instanceof FairScheduler) {
context().setStatus(404);
throw new WebAppException("Fair Scheduler UI not yet supported");
}
setTitle("Default Scheduler");
render(DefaultSchedulerPage.class);
}

View File

@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;
}
@Test
public void testDelayScheduling() {
Queue queue = Mockito.mock(Queue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
double nodeLocalityThreshold = .5;
double rackLocalityThreshold = .6;
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// First five scheduling opportunities should remain node local
for (int i = 0; i < 5; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
// After five it should switch to rack local
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// Manually set back to node local
schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
schedulerApp.resetSchedulingOpportunities(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// Now escalate again to rack-local, then to off-switch
for (int i = 0; i < 5; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
for (int i = 0; i < 6; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
@Test
/**
* Ensure that when negative paramaters are given (signaling delay scheduling
* no tin use), the least restrictive locality level is returned.
*/
public void testLocalityLevelWithoutDelays() {
Queue queue = Mockito.mock(Queue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}
}

View File

@ -0,0 +1,992 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestFairScheduler {
private class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(int seconds) {
time = time + seconds * 1000;
}
}
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath();
final static String ALLOC_FILE = new File(TEST_DIR,
"test-queues").getAbsolutePath();
private FairScheduler scheduler;
private ResourceManager resourceManager;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private int APP_ID = 1; // Incrementing counter for schedling apps
private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
// HELPER METHODS
@Before
public void setUp() throws IOException {
scheduler = new FairScheduler();
Configuration conf = new Configuration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
Store store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store);
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
}
@After
public void tearDown() {
scheduler = null;
resourceManager = null;
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
appIdImpl.setId(appId);
attId.setAttemptId(attemptId);
attId.setApplicationId(appIdImpl);
return attId;
}
private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setHostName(host);
request.setNumContainers(numContainers);
Priority prio = recordFactory.newRecordInstance(Priority.class);
prio.setPriority(priority);
request.setPriority(prio);
return request;
}
/**
* Creates a single container priority-1 request and submits to
* scheduler.
*/
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) {
return createSchedulingRequest(memory, queueId, userId, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) {
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
return id;
}
// TESTS
@Test
public void testAggregateCapacityTracking() throws Exception {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
assertEquals(1024, scheduler.getClusterCapacity().getMemory());
// Add another node
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
assertEquals(1536, scheduler.getClusterCapacity().getMemory());
// Remove the first node
NodeRemovedSchedulerEvent nodeEvent3 = new NodeRemovedSchedulerEvent(node1);
scheduler.handle(nodeEvent3);
assertEquals(512, scheduler.getClusterCapacity().getMemory());
}
@Test
public void testSimpleFairShareCalculation() {
// Add one big node (only care about aggregate capacity)
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "queue2", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
assertEquals(3, queues.size());
for (FSQueue p : queues) {
if (p.getName() != "default") {
assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory());
}
}
}
@Test
public void testSimpleContainerAllocation() {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Add another node
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(512));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
createSchedulingRequest(512, "queue1", "user1", 2);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(updateEvent);
assertEquals(512, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
}
@Test
public void testSimpleContainerReservation() throws InterruptedException {
// Add a node
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue 1 requests full capacity of node
createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(updateEvent);
// Make sure queue 1 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
// Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory());
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
// Now another node checks in with capacity
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(nodeEvent2);
scheduler.handle(updateEvent2);
// Make sure this goes to queue 2
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory());
// The old reservation should still be there...
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
// ... but it should disappear when we update the first node.
scheduler.handle(updateEvent);
assertEquals(0, scheduler.applications.get(attId).getCurrentReservation().getMemory());
}
@Test
public void testUserAsDefaultQueue() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
createAppAttemptId(1, 1), "default", "user1");
scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size());
}
@Test
public void testFairShareWithMinAlloc() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// Add one big node (only care about aggregate capacity)
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
createSchedulingRequest(2 * 1024, "queueA", "user1");
createSchedulingRequest(2 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
assertEquals(3, queues.size());
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory());
}
else if (p.getName().equals("queueB")) {
assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory());
}
}
}
/**
* Make allocation requests and ensure they are reflected in queue demand.
*/
@Test
public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11, "queue1", "user1");
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21, "queue2", "user1");
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22, "queue2", "user1");
// First ask, queue1 requests 1024
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = createResourceRequest(1024, "*", 1, 1);
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
// Second ask, queue2 requests 1024 + (2 * 512)
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ResourceRequest request2 = createResourceRequest(1024, "foo", 1, 1);
ResourceRequest request3 = createResourceRequest(512, "bar", 1, 2);
ask2.add(request2);
ask2.add(request3);
scheduler.allocate(id21, ask2, new ArrayList<ContainerId>());
// Third ask, queue2 requests 1024
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(1024, "*", 1, 1);
ask3.add(request4);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
scheduler.update();
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").getQueueSchedulable().getDemand().getMemory());
assertEquals(1024 + 1024 + (2 * 512), scheduler.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand().getMemory());
}
@Test
public void testAppAdditionAndRemoval() throws Exception {
AppAddedSchedulerEvent appAddedEvent1 = new AppAddedSchedulerEvent(
createAppAttemptId(1, 1), "default", "user1");
scheduler.handle(appAddedEvent1);
// Scheduler should have one queue (the default)
assertEquals(1, scheduler.getQueueManager().getQueues().size());
// That queue should have one app
assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
// Now remove app
scheduler.handle(appRemovedEvent1);
// Default queue should have no apps
assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
}
@Test
public void testAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<queue name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
// Give queue B a minimum of 2048 M
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</queue>");
// Give queue C no minimum
out.println("<queue name=\"queueC\">");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</queue>");
// Give queue D a limit of 3 running apps
out.println("<queue name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("</queue>");
// Give queue E a preemption timeout of one minute
out.println("<queue name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</queue>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
out.println("</user>");
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA"));
assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2"));
// Unspecified queues should get default ACL
Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Give queue A a minimum of 1024 M
out.println("<pool name=\"queueA\">");
out.println("<minResources>1024</minResources>");
out.println("</pool>");
// Give queue B a minimum of 2048 M
out.println("<pool name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>");
out.println("</pool>");
// Give queue C no minimum
out.println("<pool name=\"queueC\">");
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>");
out.println("</pool>");
// Give queue D a limit of 3 running apps
out.println("<pool name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("</pool>");
// Give queue E a preemption timeout of one minute
out.println("<pool name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</pool>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>");
// Give user1 a limit of 10 jobs
out.println("<user name=\"user1\">");
out.println("<maxRunningApps>10</maxRunningApps>");
out.println("</user>");
// Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA"));
assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2"));
// Unspecified queues should get default ACL
Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
@Test
public void testIsStarvedForMinShare() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// Add one big node (only care about aggregate capacity)
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
assertEquals(3, queues.size());
// Queue A should be above min share, B below.
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
}
else if (p.getName().equals("queueB")) {
assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
}
}
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here)
for (FSQueue p : queues) {
if (p.getName().equals("queueB")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
}
}
}
@Test
public void testIsStarvedForFairShare() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.75</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// Add one big node (only care about aggregate capacity)
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
assertEquals(3, queues.size());
// Queue A should be above fair share, B below.
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
}
else if (p.getName().equals("queueB")) {
assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
}
}
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// B should not be starved for fair share, since entire demand is
// satisfied.
for (FSQueue p : queues) {
if (p.getName().equals("queueB")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
}
}
}
@Test
/**
* Make sure containers are chosen to be preempted in the correct order. Right
* now this means decreasing order of priority.
*/
public void testChoiceOfPreemptedContainers() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// Create four nodes
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
ApplicationAttemptId app2 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
ApplicationAttemptId app3 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
ApplicationAttemptId app4 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
ApplicationAttemptId app5 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
ApplicationAttemptId app6 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
scheduler.update();
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate3);
}
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app6).getLiveContainers().size());
// Now new requests arrive from queues C and D
ApplicationAttemptId app7 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
ApplicationAttemptId app8 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
ApplicationAttemptId app9 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
ApplicationAttemptId app10 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
ApplicationAttemptId app11 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
ApplicationAttemptId app12 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
scheduler.update();
// We should be able to claw back one container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app5).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
scheduler.preemptResources(scheduler.getQueueSchedulables(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app3).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app4).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app5).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
}
@Test
/**
* Tests the timing of decision to preempt tasks.
*/
public void testPreemptionDecision() throws Exception {
Configuration conf = new Configuration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
scheduler.reinitialize(conf, null, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
// Create four nodes
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 = MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024));
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
ApplicationAttemptId app2 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
ApplicationAttemptId app3 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
ApplicationAttemptId app4 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
ApplicationAttemptId app5 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
ApplicationAttemptId app6 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
scheduler.update();
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3,
new LinkedList<ContainerStatus>(), new LinkedList<ContainerStatus>());
scheduler.handle(nodeUpdate3);
}
// Now new requests arrive from queues C and D
ApplicationAttemptId app7 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
ApplicationAttemptId app8 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
ApplicationAttemptId app9 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
ApplicationAttemptId app10 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
ApplicationAttemptId app11 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
ApplicationAttemptId app12 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
scheduler.update();
FSQueueSchedulable schedC =
scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable();
FSQueueSchedulable schedD =
scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable();
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
// After minSharePreemptionTime has passed, they should want to preempt min
// share.
clock.tick(6);
assertTrue(Resources.equals(
Resources.createResource(1024), scheduler.resToPreempt(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.createResource(1024), scheduler.resToPreempt(schedD, clock.getTime())));
// After fairSharePreemptionTime has passed, they should want to preempt
// fair share.
scheduler.update();
clock.tick(6);
assertTrue(Resources.equals(
Resources.createResource(1536), scheduler.resToPreempt(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.createResource(1536), scheduler.resToPreempt(schedD, clock.getTime())));
}
}

View File

@ -0,0 +1,179 @@
~~ Licensed under the Apache License, Version 2.0 (the "License");
~~ you may not use this file except in compliance with the License.
~~ You may obtain a copy of the License at
~~
~~ http://www.apache.org/licenses/LICENSE-2.0
~~
~~ Unless required by applicable law or agreed to in writing, software
~~ distributed under the License is distributed on an "AS IS" BASIS,
~~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~~ See the License for the specific language governing permissions and
~~ limitations under the License. See accompanying LICENSE file.
---
Hadoop Map Reduce Next Generation-${project.version} - Fair Scheduler
---
---
${maven.build.timestamp}
Hadoop MapReduce Next Generation - Fair Scheduler
\[ {{{./index.html}Go Back}} \]
%{toc|section=1|fromDepth=0}
* {Purpose}
This document describes the <<<FairScheduler>>>, a pluggable scheduler for Hadoop
which provides a way to share large clusters. <<NOTE:>> The Fair Scheduler
implementation is currently under development and should be considered experimental.
* {Introduction}
Fair scheduling is a method of assigning resources to applications such that
all apps get, on average, an equal share of resources over time.
Hadoop NextGen is capable of scheduling multiple resource types, such as
Memory and CPU. Currently only memory is supported, so a "cluster share" is
a proportion of aggregate memory in the cluster. When there is a single app
running, that app uses the entire cluster. When other apps are submitted,
resources that free up are assigned to the new apps, so that each app gets
roughly the same amount of resources. Unlike the default Hadoop scheduler,
which forms a queue of apps, this lets short apps finish in reasonable time
while not starving long-lived apps. It is also a reasonable way to share a
cluster between a number of users. Finally, fair sharing can also work with
app priorities - the priorities are used as weights to determine the
fraction of total resources that each app should get.
The scheduler organizes apps further into "queues", and shares resources
fairly between these queues. By default, all users share a single queue,
called “default”. If an app specifically lists a queue in a container
resource request, the request is submitted to that queue. It is also
possible to assign queues based on the user name included with the request
through configuration. Within each queue, fair sharing is used to share
capacity between the running apps. queues can also be given weights to share
the cluster non-proportionally in the config file.
In addition to providing fair sharing, the Fair Scheduler allows assigning
guaranteed minimum shares to queues, which is useful for ensuring that
certain users, groups or production applications always get sufficient
resources. When a queue contains apps, it gets at least its minimum share,
but when the queue does not need its full guaranteed share, the excess is
split between other running apps. This lets the scheduler guarantee capacity
for queues while utilizing resources efficiently when these queues don't
contain applications.
The Fair Scheduler lets all apps run by default, but it is also possible to
limit the number of running apps per user and per queue through the config
file. This can be useful when a user must submit hundreds of apps at once,
or in general to improve performance if running too many apps at once would
cause too much intermediate data to be created or too much context-switching.
Limiting the apps does not cause any subsequently submitted apps to fail,
only to wait in the scheduler's queue until some of the user's earlier apps
finish. apps to run from each user/queue are chosen in order of priority and
then submit time, as in the default FIFO scheduler in Hadoop.
Certain add-ons are not yet supported which existed in the original (MR1)
Fair Scheduler. Among them, is the use of a custom policies governing
priority “boosting” over certain apps.
* {Installation}
To use the Fair Scheduler first assign the appropriate scheduler class in
yarn-site.xml:
------
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>
------
* {Configuration}
Customizing the Fair Scheduler typically involves altering two files. First,
scheduler-wide options can be set by adding configuration properties in the
fair-scheduler.xml file in your existing configuration directory. Second, in
most cases users will want to create a manifest file listing which queues
exist and their respective weights and capacities. The location of this file
is flexible - but it must be declared in fair-scheduler.xml.
* <<<yarn.scheduler.fair.allocation.file>>>
* Path to allocation file. An allocation file is an XML manifest describing
queues and their properties, in addition to certain policy defaults. This file
must be in XML format as described in the next section.
* <<<yarn.scheduler.fair.minimum-allocation-mb>>>
* The smallest container size the scheduler can allocate, in MB of memory.
* <<<yarn.scheduler.fair.minimum-allocation-mb>>>
* The largest container the scheduler can allocate, in MB of memory.
* <<<yarn.scheduler.fair.user-as-default-queue>>>
* Whether to use the username associated with the allocation as the default
queue name, in the event that a queue name is not specified. If this is set
to "false" or unset, all jobs have a shared default queue, called "default".
* <<<yarn.scheduler.fair.preemption>>>
* Whether to use preemption. Note that preemption is experimental in the current
version.
* <<<yarn.scheduler.fair.sizebasedweight>>>
* Whether to assign shares to individual apps based on their size, rather than
providing an equal share to all apps regardless of size.
* <<<yarn.scheduler.fair.assignmultiple>>>
* Whether to allow multiple container assignments in one heartbeat.
Allocation file format
The allocation file must be in XML format. The format contains three types of
elements:
* <<Queue elements>>, which represent queues. Each may contain the following
properties:
* minResources: minimum amount of aggregate memory
* maxResources: maximum amount of aggregate memory
* maxRunningApps: limit the number of apps from the queue to run at once
* weight: to share the cluster non-proportionally with other queues
* schedulingMode: either "fifo" or "fair" depending on the in-queue scheduling
policy desired
* <<User elements>>, which represent settings governing the behavior of individual
users. They can contain a single property: maxRunningApps, a limit on the
number of running apps for a particular user.
* <<A userMaxAppsDefault element>>, which sets the default running app limit
for any users whose limit is not otherwise specified.
An example allocation file is given here:
---
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>100000</minResources>
<maxResources>900000</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingMode>fair</schedulingMode>
</queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
</allocations>
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.