YARN-2399. FairScheduler: Merge AppSchedulable and FSSchedulerApp into FSAppAttempt. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1617623 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-08-12 23:04:06 +00:00
parent b929f4a5b0
commit d0c3ca05de
21 changed files with 1034 additions and 1098 deletions

View File

@ -22,10 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.AppSchedulable;
.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import com.codahale.metrics.Gauge;
import org.apache.hadoop.yarn.sls.SLSRunner;
@ -66,8 +65,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
public void trackApp(ApplicationAttemptId appAttemptId, String oldAppId) {
super.trackApp(appAttemptId, oldAppId);
FairScheduler fair = (FairScheduler) scheduler;
final AppSchedulable app = fair.getSchedulerApp(appAttemptId)
.getAppSchedulable();
final FSAppAttempt app = fair.getSchedulerApp(appAttemptId);
metrics.register("variable.app." + oldAppId + ".demand.memory",
new Gauge<Integer>() {
@Override

View File

@ -104,6 +104,9 @@ Release 2.6.0 - UNRELEASED
YARN-2317. Updated the document about how to write YARN applications. (Li Lu via
zjshen)
YARN-2399. FairScheduler: Merge AppSchedulable and FSSchedulerApp into
FSAppAttempt. (kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -1,463 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class AppSchedulable extends Schedulable {
private static final DefaultResourceCalculator RESOURCE_CALCULATOR
= new DefaultResourceCalculator();
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
private long startTime;
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager;
private Priority priority;
private ResourceWeights resourceWeights;
private RMContainerComparator comparator = new RMContainerComparator();
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = scheduler.getClock().getTime();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
this.priority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
@Override
public String getName() {
return app.getApplicationId().toString();
}
public FSSchedulerApp getApp() {
return app;
}
public ResourceWeights getResourceWeights() {
return resourceWeights;
}
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, app.getCurrentConsumption());
// Add up outstanding resource requests
synchronized (app) {
for (Priority p : app.getPriorities()) {
for (ResourceRequest r : app.getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Resource getResourceUsage() {
// Here the getPreemptedResources() always return zero, except in
// a preemption round
return Resources.subtract(app.getCurrentConsumption(),
app.getPreemptedResources());
}
@Override
public Resource getMinShare() {
return Resources.none();
}
@Override
public Resource getMaxShare() {
return Resources.unbounded();
}
/**
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return queue.getMetrics();
}
@Override
public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
return priority;
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(
FSSchedulerApp application, FSSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
// Create the container
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
return container;
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/
private void reserve(Priority priority, FSSchedulerNode node,
Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(app.getUser(), container.getResource());
RMContainer rmContainer = app.reserve(node, priority, null,
container);
node.reserveResource(app, priority, rmContainer);
}
else {
RMContainer rmContainer = node.getReservedContainer();
app.reserve(node, priority, rmContainer, container);
node.reserveResource(app, priority, rmContainer);
}
}
/**
* Remove the reservation on {@code node} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
public void unreserve(Priority priority, FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
app.unreserve(node, priority);
node.unreserveResource(app);
getMetrics().unreserveResource(
app.getUser(), rmContainer.getContainer().getResource());
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*
* @param node
* The node to try placing the container on.
* @param priority
* The requested priority for the container.
* @param request
* The ResourceRequest we're trying to satisfy.
* @param type
* The locality of the assignment.
* @param reserved
* Whether there's already a container reserved for this app on the node.
* @return
* If an assignment was made, returns the resources allocated to the
* container. If a reservation was made, returns
* FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was
* made, returns an empty resource.
*/
private Resource assignContainer(FSSchedulerNode node,
ResourceRequest request, NodeType type,
boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(app, node, capability, request.getPriority());
}
// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, request.getPriority(), request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(request.getPriority(), node);
}
return Resources.none();
}
// If we had previously made a reservation, delete it
if (reserved) {
unreserve(request.getPriority(), node);
}
// Inform the node
node.allocateContainer(allocatedContainer);
// If this container is used to run AM, update the leaf queue's AM usage
if (app.getLiveContainers().size() == 1 &&
!app.getUnmanagedAM()) {
queue.addAMResourceUsage(container.getResource());
app.setAmRunning(true);
}
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(request.getPriority(), node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
}
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
}
Collection<Priority> prioritiesToTry = (reserved) ?
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
app.getPriorities();
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
synchronized (app) {
for (Priority priority : prioritiesToTry) {
if (app.getTotalRequiredResources(priority) <= 0 ||
!hasContainerForNode(priority, node)) {
continue;
}
app.addSchedulingOpportunity(priority);
// Check the AM resource usage for the leaf queue
if (app.getLiveContainers().size() == 0
&& !app.getUnmanagedAM()) {
if (!queue.canRunAppAM(app.getAMResource())) {
return Resources.none();
}
}
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getNodeName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
+ localRequest);
}
NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = app.getAllowedLocalityLevelByTime(priority,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest,
NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
continue;
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
}
return Resources.none();
}
/**
* Called when this application already has an existing reservation on the
* given node. Sees whether we can turn the reservation into an allocation.
* Also checks whether the application needs the reservation anymore, and
* releases it if not.
*
* @param node
* Node that the application has an existing reservation on
*/
public Resource assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
Priority priority = rmContainer.getReservedPriority();
// Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) {
unreserve(priority, node);
return Resources.none();
}
// Fail early if the reserved container won't fit.
// Note that we have an assumption here that there's only one container size
// per priority.
if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getAvailableResource())) {
return Resources.none();
}
return assignContainer(node, true);
}
@Override
public Resource assignContainer(FSSchedulerNode node) {
return assignContainer(node, false);
}
/**
* Preempt a running container according to the priority
*/
@Override
public RMContainer preemptContainer() {
if (LOG.isDebugEnabled()) {
LOG.debug("App " + getName() + " is going to preempt a running " +
"container");
}
RMContainer toBePreempted = null;
for (RMContainer container : app.getLiveContainers()) {
if (! app.getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
}
/**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
*/
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName());
return
// There must be outstanding requests at the given priority:
anyRequest != null && anyRequest.getNumContainers() > 0 &&
// If locality relaxation is turned off at *-level, there must be a
// non-zero request for the node's rack:
(anyRequest.getRelaxLocality() ||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
// If locality relaxation is turned off at rack-level, there must be a
// non-zero request at the node:
(rackRequest == null || rackRequest.getRelaxLocality() ||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
// The requested container must be able to fit on the node:
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
}
}

View File

@ -0,0 +1,768 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Represents an application attempt from the viewpoint of the Fair Scheduler.
*/
@Private
@Unstable
public class FSAppAttempt extends SchedulerApplicationAttempt
implements Schedulable {
private static final Log LOG = LogFactory.getLog(FSAppAttempt.class);
private static final DefaultResourceCalculator RESOURCE_CALCULATOR
= new DefaultResourceCalculator();
private long startTime;
private Priority priority;
private ResourceWeights resourceWeights;
private Resource demand = Resources.createResource(0);
private FairScheduler scheduler;
private Resource fairShare = Resources.createResource(0, 0);
private Resource preemptedResources = Resources.createResource(0);
private RMContainerComparator comparator = new RMContainerComparator();
private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this
* we first only allow node-local assigments for a given prioirty level,
* then relax the locality threshold once we've had a long enough period
* without succesfully scheduling. We measure both the number of "missed"
* scheduling opportunities since the last container was scheduled
* at the current allowed level and the time since the last container
* was scheduled. Currently we use only the former.
*/
private final Map<Priority, NodeType> allowedLocalityLevel =
new HashMap<Priority, NodeType>();
public FSAppAttempt(FairScheduler scheduler,
ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue,
ActiveUsersManager activeUsersManager, RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.scheduler = scheduler;
this.startTime = scheduler.getClock().getTime();
this.priority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
public ResourceWeights getResourceWeights() {
return resourceWeights;
}
/**
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return queue.getMetrics();
}
synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId);
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
}
private synchronized void unreserveInternal(
Priority priority, FSSchedulerNode node) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size() + " at priority "
+ priority + "; currentReservation " + currentReservation);
}
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(priority).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to
* fail at (as a fraction of cluster size) before relaxing scheduling
* constraints.
*/
public synchronized NodeType getAllowedLocalityLevel(Priority priority,
int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority);
}
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority);
}
}
return allowedLocalityLevel.get(priority);
}
/**
* Return the level at which we are allowed to schedule containers.
* Given the thresholds indicating how much time passed before relaxing
* scheduling constraints.
*/
public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
long nodeLocalityDelayMs, long rackLocalityDelayMs,
long currentTimeMs) {
// if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH;
}
// default level is NODE_LOCAL
if (! allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// if level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(priority)) {
waitTime -= lastScheduledContainer.get(priority);
} else {
waitTime -= getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority, currentTimeMs);
}
}
return allowedLocalityLevel.get(priority);
}
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) ||
type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(priority, type);
}
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(priority, type);
}
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
}
/**
* Should be called when the scheduler assigns a container at a higher
* degree of locality than the current threshold. Reset the allowed locality
* level to a higher degree of locality.
*/
public synchronized void resetAllowedLocalityLevel(Priority priority,
NodeType level) {
NodeType old = allowedLocalityLevel.get(priority);
LOG.info("Raising locality level from " + old + " to " + level + " at " +
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
public Resource getPreemptedResources() {
return preemptedResources;
}
public void resetPreemptedResources() {
preemptedResources = Resources.createResource(0);
for (RMContainer container : getPreemptionContainers()) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
public void clearPreemptedResources() {
preemptedResources.setMemory(0);
preemptedResources.setVirtualCores(0);
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(
FSSchedulerNode node, Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(
getApplicationAttemptId(), getNewContainerId());
// Create the container
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
return container;
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in {@link FSSchedulerNode}..
*/
private void reserve(Priority priority, FSSchedulerNode node,
Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(getUser(), container.getResource());
RMContainer rmContainer =
super.reserve(node, priority, null, container);
node.reserveResource(this, priority, rmContainer);
} else {
RMContainer rmContainer = node.getReservedContainer();
super.reserve(node, priority, rmContainer, container);
node.reserveResource(this, priority, rmContainer);
}
}
/**
* Remove the reservation on {@code node} at the given {@link Priority}.
* This dispatches SchedulerNode handlers as well.
*/
public void unreserve(Priority priority, FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
unreserveInternal(priority, node);
node.unreserveResource(this);
getMetrics().unreserveResource(
getUser(), rmContainer.getContainer().getResource());
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*
* @param node
* The node to try placing the container on.
* @param request
* The ResourceRequest we're trying to satisfy.
* @param type
* The locality of the assignment.
* @param reserved
* Whether there's already a container reserved for this app on the node.
* @return
* If an assignment was made, returns the resources allocated to the
* container. If a reservation was made, returns
* FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was
* made, returns an empty resource.
*/
private Resource assignContainer(
FSSchedulerNode node, ResourceRequest request, NodeType type,
boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(node, capability, request.getPriority());
}
// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
allocate(type, node, request.getPriority(), request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(request.getPriority(), node);
}
return Resources.none();
}
// If we had previously made a reservation, delete it
if (reserved) {
unreserve(request.getPriority(), node);
}
// Inform the node
node.allocateContainer(allocatedContainer);
// If this container is used to run AM, update the leaf queue's AM usage
if (getLiveContainers().size() == 1 && !getUnmanagedAM()) {
getQueue().addAMResourceUsage(container.getResource());
setAmRunning(true);
}
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(request.getPriority(), node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
}
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
}
Collection<Priority> prioritiesToTry = (reserved) ?
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
getPriorities();
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
synchronized (this) {
for (Priority priority : prioritiesToTry) {
if (getTotalRequiredResources(priority) <= 0 ||
!hasContainerForNode(priority, node)) {
continue;
}
addSchedulingOpportunity(priority);
// Check the AM resource usage for the leaf queue
if (getLiveContainers().size() == 0 && !getUnmanagedAM()) {
if (!getQueue().canRunAppAM(getAMResource())) {
return Resources.none();
}
}
ResourceRequest rackLocalRequest = getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
node.getNodeName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
+ localRequest);
}
NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = getAllowedLocalityLevelByTime(priority,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest,
NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
continue;
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
ResourceRequest offSwitchRequest =
getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
}
return Resources.none();
}
/**
* Called when this application already has an existing reservation on the
* given node. Sees whether we can turn the reservation into an allocation.
* Also checks whether the application needs the reservation anymore, and
* releases it if not.
*
* @param node
* Node that the application has an existing reservation on
*/
public Resource assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
Priority priority = rmContainer.getReservedPriority();
// Make sure the application still needs requests at this priority
if (getTotalRequiredResources(priority) == 0) {
unreserve(priority, node);
return Resources.none();
}
// Fail early if the reserved container won't fit.
// Note that we have an assumption here that there's only one container size
// per priority.
if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getAvailableResource())) {
return Resources.none();
}
return assignContainer(node, true);
}
/**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
*/
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName());
ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName());
return
// There must be outstanding requests at the given priority:
anyRequest != null && anyRequest.getNumContainers() > 0 &&
// If locality relaxation is turned off at *-level, there must be a
// non-zero request for the node's rack:
(anyRequest.getRelaxLocality() ||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
// If locality relaxation is turned off at rack-level, there must be a
// non-zero request at the node:
(rackRequest == null || rackRequest.getRelaxLocality() ||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
// The requested container must be able to fit on the node:
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
}
/* Schedulable methods implementation */
@Override
public String getName() {
return getApplicationId().toString();
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Resource getMinShare() {
return Resources.none();
}
@Override
public Resource getMaxShare() {
return Resources.unbounded();
}
@Override
public Resource getResourceUsage() {
// Here the getPreemptedResources() always return zero, except in
// a preemption round
return Resources.subtract(getCurrentConsumption(), getPreemptedResources());
}
@Override
public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
return priority;
}
@Override
public Resource getFairShare() {
return this.fairShare;
}
@Override
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;
}
@Override
public boolean isActive() {
return true;
}
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, getCurrentConsumption());
// Add up outstanding resource requests
synchronized (this) {
for (Priority p : getPriorities()) {
for (ResourceRequest r : getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
}
@Override
public Resource assignContainer(FSSchedulerNode node) {
return assignContainer(node, false);
}
/**
* Preempt a running container according to the priority
*/
@Override
public RMContainer preemptContainer() {
if (LOG.isDebugEnabled()) {
LOG.debug("App " + getName() + " is going to preempt a running " +
"container");
}
RMContainer toBePreempted = null;
for (RMContainer container : getLiveContainers()) {
if (!getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
}
}

View File

@ -45,10 +45,10 @@ public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
new ArrayList<AppSchedulable>();
private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>();
private final List<FSAppAttempt> runnableApps = // apps that are runnable
new ArrayList<FSAppAttempt>();
private final List<FSAppAttempt> nonRunnableApps =
new ArrayList<FSAppAttempt>();
private Resource demand = Resources.createResource(0);
@ -70,33 +70,31 @@ public class FSLeafQueue extends FSQueue {
amResourceUsage = Resource.newInstance(0, 0);
}
public void addApp(FSSchedulerApp app, boolean runnable) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
public void addApp(FSAppAttempt app, boolean runnable) {
if (runnable) {
runnableAppScheds.add(appSchedulable);
runnableApps.add(app);
} else {
nonRunnableAppScheds.add(appSchedulable);
nonRunnableApps.add(app);
}
}
// for testing
void addAppSchedulable(AppSchedulable appSched) {
runnableAppScheds.add(appSched);
void addAppSchedulable(FSAppAttempt appSched) {
runnableApps.add(appSched);
}
/**
* Removes the given app from this queue.
* @return whether or not the app was runnable
*/
public boolean removeApp(FSSchedulerApp app) {
if (runnableAppScheds.remove(app.getAppSchedulable())) {
public boolean removeApp(FSAppAttempt app) {
if (runnableApps.remove(app)) {
// Update AM resource usage
if (app.isAmRunning() && app.getAMResource() != null) {
Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
return true;
} else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
} else if (nonRunnableApps.remove(app)) {
return false;
} else {
throw new IllegalStateException("Given app to remove " + app +
@ -104,22 +102,22 @@ public class FSLeafQueue extends FSQueue {
}
}
public Collection<AppSchedulable> getRunnableAppSchedulables() {
return runnableAppScheds;
public Collection<FSAppAttempt> getRunnableAppSchedulables() {
return runnableApps;
}
public List<AppSchedulable> getNonRunnableAppSchedulables() {
return nonRunnableAppScheds;
public List<FSAppAttempt> getNonRunnableAppSchedulables() {
return nonRunnableApps;
}
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (AppSchedulable appSched : runnableAppScheds) {
apps.add(appSched.getApp().getApplicationAttemptId());
for (FSAppAttempt appSched : runnableApps) {
apps.add(appSched.getApplicationAttemptId());
}
for (AppSchedulable appSched : nonRunnableAppScheds) {
apps.add(appSched.getApp().getApplicationAttemptId());
for (FSAppAttempt appSched : nonRunnableApps) {
apps.add(appSched.getApplicationAttemptId());
}
}
@ -145,10 +143,10 @@ public class FSLeafQueue extends FSQueue {
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : runnableAppScheds) {
for (FSAppAttempt app : runnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
for (AppSchedulable app : nonRunnableAppScheds) {
for (FSAppAttempt app : nonRunnableApps) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
@ -165,13 +163,13 @@ public class FSLeafQueue extends FSQueue {
Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName());
demand = Resources.createResource(0);
for (AppSchedulable sched : runnableAppScheds) {
for (FSAppAttempt sched : runnableApps) {
if (Resources.equals(demand, maxRes)) {
break;
}
updateDemandForApp(sched, maxRes);
}
for (AppSchedulable sched : nonRunnableAppScheds) {
for (FSAppAttempt sched : nonRunnableApps) {
if (Resources.equals(demand, maxRes)) {
break;
}
@ -183,7 +181,7 @@ public class FSLeafQueue extends FSQueue {
}
}
private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
@ -207,9 +205,9 @@ public class FSLeafQueue extends FSQueue {
}
Comparator<Schedulable> comparator = policy.getComparator();
Collections.sort(runnableAppScheds, comparator);
for (AppSchedulable sched : runnableAppScheds) {
if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
Collections.sort(runnableApps, comparator);
for (FSAppAttempt sched : runnableApps) {
if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
continue;
}
@ -237,8 +235,8 @@ public class FSLeafQueue extends FSQueue {
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
AppSchedulable candidateSched = null;
for (AppSchedulable sched : runnableAppScheds) {
FSAppAttempt candidateSched = null;
for (FSAppAttempt sched : runnableApps) {
if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched;
@ -291,7 +289,7 @@ public class FSLeafQueue extends FSQueue {
@Override
public int getNumRunnableApps() {
return runnableAppScheds.size();
return runnableApps.size();
}
@Override

View File

@ -39,7 +39,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public abstract class FSQueue extends Schedulable implements Queue {
public abstract class FSQueue implements Queue, Schedulable {
private Resource fairShare = Resources.createResource(0, 0);
private final String name;
protected final FairScheduler scheduler;
private final FSQueueMetrics metrics;
@ -140,9 +141,14 @@ public abstract class FSQueue extends Schedulable implements Queue {
return metrics;
}
/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare() {
return fairShare;
}
@Override
public void setFairShare(Resource fairShare) {
super.setFairShare(fairShare);
this.fairShare = fairShare;
metrics.setFairShare(fairShare);
}
@ -187,4 +193,16 @@ public abstract class FSQueue extends Schedulable implements Queue {
}
return true;
}
@Override
public boolean isActive() {
return getNumRunnableApps() > 0;
}
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
}

View File

@ -1,360 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Represents an application attempt from the viewpoint of the Fair Scheduler.
*/
@Private
@Unstable
public class FSSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
private AppSchedulable appSchedulable;
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
private Resource preemptedResources = Resources.createResource(0);
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
public void setAppSchedulable(AppSchedulable appSchedulable) {
this.appSchedulable = appSchedulable;
}
public AppSchedulable getAppSchedulable() {
return appSchedulable;
}
synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId);
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
}
public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size() + " at priority "
+ priority + "; currentReservation " + currentReservation);
}
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(priority).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this
* we first only allow node-local assigments for a given prioirty level,
* then relax the locality threshold once we've had a long enough period
* without succesfully scheduling. We measure both the number of "missed"
* scheduling opportunities since the last container was scheduled
* at the current allowed level and the time since the last container
* was scheduled. Currently we use only the former.
*/
// Current locality threshold
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
Priority, NodeType>();
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to
* fail at (as a fraction of cluster size) before relaxing scheduling
* constraints.
*/
public synchronized NodeType getAllowedLocalityLevel(Priority priority,
int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority);
}
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority);
}
}
return allowedLocalityLevel.get(priority);
}
/**
* Return the level at which we are allowed to schedule containers.
* Given the thresholds indicating how much time passed before relaxing
* scheduling constraints.
*/
public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
long nodeLocalityDelayMs, long rackLocalityDelayMs,
long currentTimeMs) {
// if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH;
}
// default level is NODE_LOCAL
if (! allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// if level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(priority)) {
waitTime -= lastScheduledContainer.get(priority);
} else {
waitTime -= appSchedulable.getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority, currentTimeMs);
}
}
return allowedLocalityLevel.get(priority);
}
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) ||
type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(priority, type);
}
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(priority, type);
}
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
}
/**
* Should be called when the scheduler assigns a container at a higher
* degree of locality than the current threshold. Reset the allowed locality
* level to a higher degree of locality.
*/
public synchronized void resetAllowedLocalityLevel(Priority priority,
NodeType level) {
NodeType old = allowedLocalityLevel.get(priority);
LOG.info("Raising locality level from " + old + " to " + level + " at " +
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
public Resource getPreemptedResources() {
return preemptedResources;
}
public void resetPreemptedResources() {
preemptedResources = Resources.createResource(0);
for (RMContainer container : getPreemptionContainers()) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
public void clearPreemptedResources() {
preemptedResources.setMemory(0);
preemptedResources.setVirtualCores(0);
}
}

View File

@ -35,7 +35,7 @@ public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private AppSchedulable reservedAppSchedulable;
private FSAppAttempt reservedAppSchedulable;
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
@ -76,7 +76,7 @@ public class FSSchedulerNode extends SchedulerNode {
" on node " + this + " for application " + application);
}
setReservedContainer(container);
this.reservedAppSchedulable = ((FSSchedulerApp) application).getAppSchedulable();
this.reservedAppSchedulable = (FSAppAttempt) application;
}
@Override
@ -98,7 +98,7 @@ public class FSSchedulerNode extends SchedulerNode {
this.reservedAppSchedulable = null;
}
public synchronized AppSchedulable getReservedAppSchedulable() {
public synchronized FSAppAttempt getReservedAppSchedulable() {
return reservedAppSchedulable;
}
}

View File

@ -117,7 +117,7 @@ import com.google.common.base.Preconditions;
@Unstable
@SuppressWarnings("unchecked")
public class FairScheduler extends
AbstractYarnScheduler<FSSchedulerApp, FSSchedulerNode> {
AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
private FairSchedulerConfiguration conf;
private Resource incrAllocation;
@ -432,8 +432,8 @@ public class FairScheduler extends
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
app.getApp().resetPreemptedResources();
for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
app.resetPreemptedResources();
}
}
@ -453,8 +453,8 @@ public class FairScheduler extends
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
app.getApp().clearPreemptedResources();
for (FSAppAttempt app : queue.getRunnableAppSchedulables()) {
app.clearPreemptedResources();
}
}
}
@ -465,7 +465,7 @@ public class FairScheduler extends
protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSSchedulerApp app = getSchedulerApp(appAttemptId);
FSAppAttempt app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
@ -490,7 +490,7 @@ public class FairScheduler extends
(getClock().getTime() - time) + "ms)");
}
} else {
// track the request in the FSSchedulerApp itself
// track the request in the FSAppAttempt itself
app.addPreemption(container, getClock().getTime());
}
}
@ -541,7 +541,7 @@ public class FairScheduler extends
}
// synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
public synchronized ResourceWeights getAppWeight(FSAppAttempt app) {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on current memory demand
@ -636,8 +636,8 @@ public class FairScheduler extends
return;
}
SchedulerApplication<FSSchedulerApp> application =
new SchedulerApplication<FSSchedulerApp>(queue, user);
SchedulerApplication<FSAppAttempt> application =
new SchedulerApplication<FSAppAttempt>(queue, user);
applications.put(applicationId, application);
queue.getMetrics().submitApp(user);
@ -661,13 +661,13 @@ public class FairScheduler extends
ApplicationAttemptId applicationAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
SchedulerApplication<FSSchedulerApp> application =
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
String user = application.getUser();
FSLeafQueue queue = (FSLeafQueue) application.getQueue();
FSSchedulerApp attempt =
new FSSchedulerApp(applicationAttemptId, user,
FSAppAttempt attempt =
new FSAppAttempt(this, applicationAttemptId, user,
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
if (transferStateFromPreviousAttempt) {
@ -742,7 +742,7 @@ public class FairScheduler extends
private synchronized void removeApplication(ApplicationId applicationId,
RMAppState finalState) {
SchedulerApplication<FSSchedulerApp> application =
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
@ -757,9 +757,9 @@ public class FairScheduler extends
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) {
LOG.info("Application " + applicationAttemptId + " is done." +
" finalState=" + rmAppAttemptFinalState);
SchedulerApplication<FSSchedulerApp> application =
SchedulerApplication<FSAppAttempt> application =
applications.get(applicationAttemptId.getApplicationId());
FSSchedulerApp attempt = getSchedulerApp(applicationAttemptId);
FSAppAttempt attempt = getSchedulerApp(applicationAttemptId);
if (attempt == null || application == null) {
LOG.info("Unknown application " + applicationAttemptId + " has completed!");
@ -820,7 +820,7 @@ public class FairScheduler extends
Container container = rmContainer.getContainer();
// Get the application for the finished container
FSSchedulerApp application =
FSAppAttempt application =
getCurrentAttemptForContainer(container.getId());
ApplicationId appId =
container.getId().getApplicationAttemptId().getApplicationId();
@ -835,8 +835,7 @@ public class FairScheduler extends
FSSchedulerNode node = getFSSchedulerNode(container.getNodeId());
if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority());
node.unreserveResource(application);
application.unreserve(rmContainer.getReservedPriority(), node);
} else {
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
@ -896,7 +895,7 @@ public class FairScheduler extends
List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
// Make sure this application exists
FSSchedulerApp application = getSchedulerApp(appAttemptId);
FSAppAttempt application = getSchedulerApp(appAttemptId);
if (application == null) {
LOG.info("Calling allocate on removed " +
"or non existant application " + appAttemptId);
@ -1066,13 +1065,13 @@ public class FairScheduler extends
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable();
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
reservedAppSchedulable.unreserve(reservedPriority, node);
reservedAppSchedulable = null;
@ -1080,7 +1079,7 @@ public class FairScheduler extends
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApp().getApplicationAttemptId()
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
@ -1105,8 +1104,8 @@ public class FairScheduler extends
updateRootQueueMetrics();
}
public FSSchedulerApp getSchedulerApp(ApplicationAttemptId appAttemptId) {
return (FSSchedulerApp) super.getApplicationAttempt(appAttemptId);
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
return super.getApplicationAttempt(appAttemptId);
}
/**
@ -1268,8 +1267,8 @@ public class FairScheduler extends
fsOpDurations = FSOpDurations.getInstance(true);
// This stores per-application scheduling information
this.applications =
new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
this.applications = new ConcurrentHashMap<
ApplicationId, SchedulerApplication<FSAppAttempt>>();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
@ -1369,7 +1368,7 @@ public class FairScheduler extends
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo() {
UserGroupInformation user = null;
UserGroupInformation user;
try {
user = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
@ -1431,11 +1430,11 @@ public class FairScheduler extends
@Override
public synchronized String moveApplication(ApplicationId appId,
String queueName) throws YarnException {
SchedulerApplication<FSSchedulerApp> app = applications.get(appId);
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
if (app == null) {
throw new YarnException("App to be moved " + appId + " not found.");
}
FSSchedulerApp attempt = (FSSchedulerApp) app.getCurrentAppAttempt();
FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
// To serialize with FairScheduler#allocate, synchronize on app attempt
synchronized (attempt) {
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
@ -1448,8 +1447,7 @@ public class FairScheduler extends
return oldQueue.getQueueName();
}
if (oldQueue.getRunnableAppSchedulables().contains(
attempt.getAppSchedulable())) {
if (oldQueue.getRunnableAppSchedulables().contains(attempt)) {
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
}
@ -1458,7 +1456,7 @@ public class FairScheduler extends
}
}
private void verifyMoveDoesNotViolateConstraints(FSSchedulerApp app,
private void verifyMoveDoesNotViolateConstraints(FSAppAttempt app,
FSLeafQueue oldQueue, FSLeafQueue targetQueue) throws YarnException {
String queueName = targetQueue.getQueueName();
ApplicationAttemptId appAttId = app.getApplicationAttemptId();
@ -1495,8 +1493,8 @@ public class FairScheduler extends
* Helper for moveApplication, which has appropriate synchronization, so all
* operations will be atomic.
*/
private void executeMove(SchedulerApplication<FSSchedulerApp> app,
FSSchedulerApp attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
private void executeMove(SchedulerApplication<FSAppAttempt> app,
FSAppAttempt attempt, FSLeafQueue oldQueue, FSLeafQueue newQueue) {
boolean wasRunnable = oldQueue.removeApp(attempt);
// if app was not runnable before, it may be runnable now
boolean nowRunnable = maxRunningEnforcer.canAppBeRunnable(newQueue,

View File

@ -25,15 +25,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Order {@link AppSchedulable} objects by priority and then by submit time, as
* Order {@link FSAppAttempt} objects by priority and then by submit time, as
* in the default scheduler in Hadoop.
*/
@Private
@Unstable
public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
public class FifoAppComparator implements Comparator<FSAppAttempt>, Serializable {
private static final long serialVersionUID = 3428835083489547918L;
public int compare(AppSchedulable a1, AppSchedulable a2) {
public int compare(FSAppAttempt a1, FSAppAttempt a2) {
int res = a1.getPriority().compareTo(a2.getPriority());
if (res == 0) {
if (a1.getStartTime() < a2.getStartTime()) {
@ -44,7 +44,7 @@ public class FifoAppComparator implements Comparator<AppSchedulable>, Serializab
}
if (res == 0) {
// If there is a tie, break it by app ID to get a deterministic order
res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
res = a1.getApplicationId().compareTo(a2.getApplicationId());
}
return res;
}

View File

@ -43,7 +43,7 @@ public class MaxRunningAppsEnforcer {
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@VisibleForTesting
final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
final ListMultimap<String, FSAppAttempt> usersNonRunnableApps;
public MaxRunningAppsEnforcer(FairScheduler scheduler) {
this.scheduler = scheduler;
@ -80,7 +80,7 @@ public class MaxRunningAppsEnforcer {
* Tracks the given new runnable app for purposes of maintaining max running
* app limits.
*/
public void trackRunnableApp(FSSchedulerApp app) {
public void trackRunnableApp(FSAppAttempt app) {
String user = app.getUser();
FSLeafQueue queue = app.getQueue();
// Increment running counts for all parent queues
@ -99,9 +99,9 @@ public class MaxRunningAppsEnforcer {
* Tracks the given new non runnable app so that it can be made runnable when
* it would not violate max running app limits.
*/
public void trackNonRunnableApp(FSSchedulerApp app) {
public void trackNonRunnableApp(FSAppAttempt app) {
String user = app.getUser();
usersNonRunnableApps.put(user, app.getAppSchedulable());
usersNonRunnableApps.put(user, app);
}
/**
@ -111,7 +111,7 @@ public class MaxRunningAppsEnforcer {
* Runs in O(n log(n)) where n is the number of queues that are under the
* highest queue that went from having no slack to having slack.
*/
public void updateRunnabilityOnAppRemoval(FSSchedulerApp app, FSLeafQueue queue) {
public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
// childqueueX might have no pending apps itself, but if a queue higher up
@ -133,8 +133,8 @@ public class MaxRunningAppsEnforcer {
parent = parent.getParent();
}
List<List<AppSchedulable>> appsNowMaybeRunnable =
new ArrayList<List<AppSchedulable>>();
List<List<FSAppAttempt>> appsNowMaybeRunnable =
new ArrayList<List<FSAppAttempt>>();
// Compile lists of apps which may now be runnable
// We gather lists instead of building a set of all non-runnable apps so
@ -150,26 +150,26 @@ public class MaxRunningAppsEnforcer {
userNumRunning = 0;
}
if (userNumRunning == allocConf.getUserMaxApps(user) - 1) {
List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
List<FSAppAttempt> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
}
}
// Scan through and check whether this means that any apps are now runnable
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
FSSchedulerApp prev = null;
List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
FSAppAttempt prev = null;
List<FSAppAttempt> noLongerPendingApps = new ArrayList<FSAppAttempt>();
while (iter.hasNext()) {
FSSchedulerApp next = iter.next();
FSAppAttempt next = iter.next();
if (next == prev) {
continue;
}
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
trackRunnableApp(next);
AppSchedulable appSched = next.getAppSchedulable();
FSAppAttempt appSched = next;
next.getQueue().getRunnableAppSchedulables().add(appSched);
noLongerPendingApps.add(appSched);
@ -186,14 +186,14 @@ public class MaxRunningAppsEnforcer {
// We remove the apps from their pending lists afterwards so that we don't
// pull them out from under the iterator. If they are not in these lists
// in the first place, there is a bug.
for (AppSchedulable appSched : noLongerPendingApps) {
if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
for (FSAppAttempt appSched : noLongerPendingApps) {
if (!appSched.getQueue().getNonRunnableAppSchedulables()
.remove(appSched)) {
LOG.error("Can't make app runnable that does not already exist in queue"
+ " as non-runnable: " + appSched + ". This should never happen.");
}
if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
LOG.error("Waiting app " + appSched + " expected to be in "
+ "usersNonRunnableApps, but was not. This should never happen.");
}
@ -204,7 +204,7 @@ public class MaxRunningAppsEnforcer {
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.
*/
public void untrackRunnableApp(FSSchedulerApp app) {
public void untrackRunnableApp(FSAppAttempt app) {
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
@ -226,8 +226,8 @@ public class MaxRunningAppsEnforcer {
/**
* Stops tracking the given non-runnable app
*/
public void untrackNonRunnableApp(FSSchedulerApp app) {
usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
public void untrackNonRunnableApp(FSAppAttempt app) {
usersNonRunnableApps.remove(app.getUser(), app);
}
/**
@ -235,7 +235,7 @@ public class MaxRunningAppsEnforcer {
* of non-runnable applications.
*/
private void gatherPossiblyRunnableAppLists(FSQueue queue,
List<List<AppSchedulable>> appLists) {
List<List<FSAppAttempt>> appLists) {
if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration()
.getQueueMaxApps(queue.getName())) {
if (queue instanceof FSLeafQueue) {
@ -259,14 +259,14 @@ public class MaxRunningAppsEnforcer {
* of O(num lists) time.
*/
static class MultiListStartTimeIterator implements
Iterator<FSSchedulerApp> {
Iterator<FSAppAttempt> {
private List<AppSchedulable>[] appLists;
private List<FSAppAttempt>[] appLists;
private int[] curPositionsInAppLists;
private PriorityQueue<IndexAndTime> appListsByCurStartTime;
@SuppressWarnings("unchecked")
public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
public MultiListStartTimeIterator(List<List<FSAppAttempt>> appListList) {
appLists = appListList.toArray(new List[appListList.size()]);
curPositionsInAppLists = new int[appLists.length];
appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
@ -284,10 +284,10 @@ public class MaxRunningAppsEnforcer {
}
@Override
public FSSchedulerApp next() {
public FSAppAttempt next() {
IndexAndTime indexAndTime = appListsByCurStartTime.remove();
int nextListIndex = indexAndTime.index;
AppSchedulable next = appLists[nextListIndex]
FSAppAttempt next = appLists[nextListIndex]
.get(curPositionsInAppLists[nextListIndex]);
curPositionsInAppLists[nextListIndex]++;
@ -299,7 +299,7 @@ public class MaxRunningAppsEnforcer {
}
appListsByCurStartTime.add(indexAndTime);
return next.getApp();
return next;
}
@Override

View File

@ -48,7 +48,7 @@ public class NewAppWeightBooster extends Configured implements WeightAdjuster {
super.setConf(conf);
}
public double adjustWeight(AppSchedulable app, double curWeight) {
public double adjustWeight(FSAppAttempt app, double curWeight) {
long start = app.getStartTime();
long now = System.currentTimeMillis();
if (now - start < duration) {

View File

@ -27,20 +27,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A Schedulable represents an entity that can launch tasks, such as a job
* or a queue. It provides a common interface so that algorithms such as fair
* sharing can be applied both within a queue and across queues. There are
* currently two types of Schedulables: JobSchedulables, which represent a
* single job, and QueueSchedulables, which allocate among jobs in their queue.
*
* Separate sets of Schedulables are used for maps and reduces. Each queue has
* both a mapSchedulable and a reduceSchedulable, and so does each job.
* A Schedulable represents an entity that can be scheduled such as an
* application or a queue. It provides a common interface so that algorithms
* such as fair sharing can be applied both within a queue and across queues.
*
* A Schedulable is responsible for three roles:
* 1) It can launch tasks through assignTask().
* 2) It provides information about the job/queue to the scheduler, including:
* 1) Assign resources through {@link #assignContainer}.
* 2) It provides information about the app/queue to the scheduler, including:
* - Demand (maximum number of tasks required)
* - Number of currently running tasks
* - Minimum share (for queues)
* - Job/queue weight (for fair sharing)
* - Start time and priority (for FIFO)
@ -57,81 +51,61 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*/
@Private
@Unstable
public abstract class Schedulable {
/** Fair share assigned to this Schedulable */
private Resource fairShare = Resources.createResource(0);
public interface Schedulable {
/**
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
public abstract String getName();
public String getName();
/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
public abstract Resource getDemand();
public Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
public abstract Resource getResourceUsage();
public Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
public abstract Resource getMinShare();
public Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
public abstract Resource getMaxShare();
public Resource getMaxShare();
/** Job/queue weight in fair sharing. */
public abstract ResourceWeights getWeights();
public ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public abstract long getStartTime();
public long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
public abstract Priority getPriority();
public Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
public abstract void updateDemand();
public void updateDemand();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
*/
public abstract Resource assignContainer(FSSchedulerNode node);
public Resource assignContainer(FSSchedulerNode node);
/**
* Preempt a container from this Schedulable if possible.
*/
public abstract RMContainer preemptContainer();
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;
}
public RMContainer preemptContainer();
/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare() {
return fairShare;
}
public Resource getFairShare();
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare);
/**
* Returns true if queue has atleast one app running. Always returns true for
* AppSchedulables.
*/
public boolean isActive() {
if (this instanceof FSQueue) {
FSQueue queue = (FSQueue) this;
return queue.getNumRunnableApps() > 0;
}
return true;
}
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
public boolean isActive();
}

View File

@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurable;
@Private
@Unstable
public interface WeightAdjuster {
public double adjustWeight(AppSchedulable app, double curWeight);
public double adjustWeight(FSAppAttempt app, double curWeight);
}

View File

@ -46,8 +46,7 @@ public class FairSchedulerInfo extends SchedulerInfo {
}
public int getAppFairShare(ApplicationAttemptId appAttemptId) {
return scheduler.getSchedulerApp(appAttemptId).
getAppSchedulable().getFairShare().getMemory();
return scheduler.getSchedulerApp(appAttemptId).getFairShare().getMemory();
}
public FairSchedulerQueueInfo getRootQueueInfo() {

View File

@ -24,7 +24,8 @@ import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
@ -39,9 +40,9 @@ public class FairSchedulerLeafQueueInfo extends FairSchedulerQueueInfo {
public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
super(queue, scheduler);
Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables();
for (AppSchedulable app : apps) {
if (app.getApp().isPending()) {
Collection<FSAppAttempt> apps = queue.getRunnableAppSchedulables();
for (FSAppAttempt app : apps) {
if (app.isPending()) {
numPendingApps++;
} else {
numActiveApps++;

View File

@ -28,10 +28,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Dummy implementation of Schedulable for unit testing.
*/
public class FakeSchedulable extends Schedulable {
public class FakeSchedulable implements Schedulable {
private Resource usage;
private Resource minShare;
private Resource maxShare;
private Resource fairShare;
private ResourceWeights weights;
private Priority priority;
private long startTime;
@ -89,6 +90,21 @@ public class FakeSchedulable extends Schedulable {
return null;
}
@Override
public Resource getFairShare() {
return this.fairShare;
}
@Override
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;
}
@Override
public boolean isActive() {
return true;
}
@Override
public Resource getDemand() {
return null;

View File

@ -18,18 +18,20 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
public class TestFSAppAttempt extends FairSchedulerTestBase {
private class MockClock implements Clock {
private long time = 0;
@ -44,11 +46,12 @@ public class TestFSSchedulerApp {
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
@Before
public void setup() {
Configuration conf = createConfiguration();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
}
@Test
@ -60,11 +63,10 @@ public class TestFSSchedulerApp {
double rackLocalityThreshold = .6;
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null,
rmContext);
RMContext rmContext = resourceManager.getRMContext();
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue ,
null, rmContext);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
@ -113,25 +115,21 @@ public class TestFSSchedulerApp {
@Test
public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException {
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue("queue", true);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
MockClock clock = new MockClock();
scheduler.setClock(clock);
long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds
long rackLocalityDelayMs = 6 * 1000L; // 6 seconds
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
RMContext rmContext = resourceManager.getRMContext();
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue,
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue,
null, rmContext);
AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class);
long startTime = clock.getTime();
Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime);
schedulerApp.setAppSchedulable(appSchedulable);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL,
@ -179,12 +177,11 @@ public class TestFSSchedulerApp {
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
RMContext rmContext = resourceManager.getRMContext();
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null,
rmContext);
FSAppAttempt schedulerApp =
new FSAppAttempt(scheduler, applicationAttemptId, "user1", queue ,
null, rmContext);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}

View File

@ -62,7 +62,7 @@ public class TestFSLeafQueue {
@Test
public void testUpdateDemand() {
AppSchedulable app = mock(AppSchedulable.class);
FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
schedulable.addAppSchedulable(app);

View File

@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -82,13 +81,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@ -1539,7 +1536,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(1, app.getLiveContainers().size());
ContainerId containerId = scheduler.getSchedulerApp(attId)
@ -1613,9 +1610,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"norealuserhasthisname2", 1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
assertNotNull("The application was not allowed", app1);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
assertNull("The application was allowed", app2);
}
@ -1688,8 +1685,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
"user1", 2);
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", true);
queue1.setPolicy(new FifoPolicy());
@ -1731,7 +1728,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId =
createSchedulingRequest(1024, "root.default", "user", 8);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
// set maxAssign to 2: only 2 containers should be allocated
scheduler.maxAssign = 2;
@ -1766,7 +1763,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId =
createSchedulingRequest(0, 1, "root.default", "user", 8);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
// set maxAssign to 2: only 2 containers should be allocated
scheduler.maxAssign = 2;
@ -1830,10 +1827,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId4 =
createSchedulingRequest(1024, fifoQueue, user, 4);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
scheduler.getQueueManager().getLeafQueue(fifoQueue, true)
.setPolicy(SchedulingPolicy.parse("fifo"));
@ -1952,7 +1949,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
assertEquals(0, app.getLiveContainers().size());
assertEquals(0, app.getReservedContainers().size());
@ -2025,7 +2022,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -2066,7 +2063,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeUpdateSchedulerEvent node2UpdateEvent = new NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -2101,7 +2098,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
"user1", 0);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
ResourceRequest nodeRequest = createResourceRequest(1024, node2.getHostName(), 1, 2, true);
ResourceRequest rackRequest = createResourceRequest(1024, "rack1", 1, 2, true);
@ -2143,7 +2140,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default",
"user1", 2);
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
@ -2165,10 +2162,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
@ -2208,13 +2205,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1",
"user1", 2);
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1",
"user1", 2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
@ -2247,19 +2244,19 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId1 = createSchedulingRequest(3074, 1, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app1 = scheduler.getSchedulerApp(appAttId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1);
ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 3, "queue1.subqueue1",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app2 = scheduler.getSchedulerApp(appAttId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2);
ApplicationAttemptId appAttId3 = createSchedulingRequest(2048, 2, "queue1.subqueue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app3 = scheduler.getSchedulerApp(appAttId3);
FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3);
ApplicationAttemptId appAttId4 = createSchedulingRequest(1024, 2, "queue2",
"user1", 2);
Thread.sleep(3); // so that start times will be different
FSSchedulerApp app4 = scheduler.getSchedulerApp(appAttId4);
FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4);
DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy();
drfPolicy.initialize(scheduler.getClusterResource());
@ -2341,7 +2338,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeUpdateSchedulerEvent(node2);
// no matter how many heartbeats, node2 should never get a container
FSSchedulerApp app = scheduler.getSchedulerApp(attId1);
FSAppAttempt app = scheduler.getSchedulerApp(attId1);
for (int i = 0; i < 10; i++) {
scheduler.handle(node2UpdateEvent);
assertEquals(0, app.getLiveContainers().size());
@ -2353,14 +2350,14 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
FSSchedulerApp app = scheduler.getSchedulerApp(attId);
FSAppAttempt app = scheduler.getSchedulerApp(attId);
FSLeafQueue queue = app.getQueue();
Collection<AppSchedulable> runnableApps =
Collection<FSAppAttempt> runnableApps =
queue.getRunnableAppSchedulables();
Collection<AppSchedulable> nonRunnableApps =
Collection<FSAppAttempt> nonRunnableApps =
queue.getNonRunnableAppSchedulables();
assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
assertEquals(runnable, runnableApps.contains(app));
assertEquals(!runnable, nonRunnableApps.contains(app));
}
private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
@ -2465,7 +2462,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(attId1, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1's AM requests 1024 MB memory",
@ -2479,7 +2476,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
createApplicationWithAMResource(attId2, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM requests 1024 MB memory",
@ -2493,7 +2490,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId3 = createAppAttemptId(3, 1);
createApplicationWithAMResource(attId3, "queue1", "user1", amResource1);
createSchedulingRequestExistingApplication(1024, 1, amPriority, attId3);
FSSchedulerApp app3 = scheduler.getSchedulerApp(attId3);
FSAppAttempt app3 = scheduler.getSchedulerApp(attId3);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application3's AM requests 1024 MB memory",
@ -2529,7 +2526,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId4 = createAppAttemptId(4, 1);
createApplicationWithAMResource(attId4, "queue1", "user1", amResource2);
createSchedulingRequestExistingApplication(2048, 2, amPriority, attId4);
FSSchedulerApp app4 = scheduler.getSchedulerApp(attId4);
FSAppAttempt app4 = scheduler.getSchedulerApp(attId4);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application4's AM requests 2048 MB memory",
@ -2543,7 +2540,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId5 = createAppAttemptId(5, 1);
createApplicationWithAMResource(attId5, "queue1", "user1", amResource2);
createSchedulingRequestExistingApplication(2048, 2, amPriority, attId5);
FSSchedulerApp app5 = scheduler.getSchedulerApp(attId5);
FSAppAttempt app5 = scheduler.getSchedulerApp(attId5);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application5's AM requests 2048 MB memory",
@ -2586,7 +2583,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId6 = createAppAttemptId(6, 1);
createApplicationWithAMResource(attId6, "queue1", "user1", amResource3);
createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6);
FSSchedulerApp app6 = scheduler.getSchedulerApp(attId6);
FSAppAttempt app6 = scheduler.getSchedulerApp(attId6);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application6's AM should not be running",
@ -2677,7 +2674,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
createApplicationWithAMResource(attId1, "queue1", "test1", amResource1);
createSchedulingRequestExistingApplication(2048, 1, amPriority, attId1);
FSSchedulerApp app1 = scheduler.getSchedulerApp(attId1);
FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application1's AM requests 2048 MB memory",
@ -2691,7 +2688,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId attId2 = createAppAttemptId(2, 1);
createApplicationWithAMResource(attId2, "queue2", "test1", amResource1);
createSchedulingRequestExistingApplication(2048, 1, amPriority, attId2);
FSSchedulerApp app2 = scheduler.getSchedulerApp(attId2);
FSAppAttempt app2 = scheduler.getSchedulerApp(attId2);
scheduler.update();
scheduler.handle(updateEvent);
assertEquals("Application2's AM requests 2048 MB memory",
@ -2823,7 +2820,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
// at least one pass
Thread.sleep(fs.getConf().getContinuousSchedulingSleepMs() + 500);
FSSchedulerApp app = fs.getSchedulerApp(appAttemptId);
FSAppAttempt app = fs.getSchedulerApp(appAttemptId);
// Wait until app gets resources.
while (app.getCurrentConsumption().equals(Resources.none())) { }
@ -3007,7 +3004,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
.size());
FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// ResourceRequest will be empty once NodeUpdate is completed
Assert.assertNull(app.getResourceRequest(priority, host));
@ -3063,7 +3060,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttemptId =
createSchedulingRequest(GB, "root.default", "user", 1);
FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
// Verify the blacklist can be updated independent of requesting containers
scheduler.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
@ -3171,12 +3168,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
scheduler.moveApplication(appId, "queue2");
FSSchedulerApp app = scheduler.getSchedulerApp(appAttId);
FSAppAttempt app = scheduler.getSchedulerApp(appAttId);
assertSame(targetQueue, app.getQueue());
assertFalse(oldQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertTrue(targetQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertFalse(oldQueue.getRunnableAppSchedulables().contains(app));
assertTrue(targetQueue.getRunnableAppSchedulables().contains(app));
assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage());
assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage());
assertEquals(0, oldQueue.getNumRunnableApps());
@ -3224,17 +3219,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
ApplicationAttemptId appAttId =
createSchedulingRequest(1024, 1, "queue1", "user1", 3);
FSSchedulerApp app = scheduler.getSchedulerApp(appAttId);
assertTrue(oldQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
FSAppAttempt app = scheduler.getSchedulerApp(appAttId);
assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app));
scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
assertFalse(oldQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertFalse(targetQueue.getNonRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertTrue(targetQueue.getRunnableAppSchedulables()
.contains(app.getAppSchedulable()));
assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app));
assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app));
assertTrue(targetQueue.getRunnableAppSchedulables().contains(app));
assertEquals(1, targetQueue.getNumRunnableApps());
assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());
}

View File

@ -42,12 +42,13 @@ public class TestMaxRunningAppsEnforcer {
private int appNum;
private TestFairScheduler.MockClock clock;
private RMContext rmContext;
private FairScheduler scheduler;
@Before
public void setup() throws Exception {
Configuration conf = new Configuration();
clock = new TestFairScheduler.MockClock();
FairScheduler scheduler = mock(FairScheduler.class);
scheduler = mock(FairScheduler.class);
when(scheduler.getConf()).thenReturn(
new FairSchedulerConfiguration(conf));
when(scheduler.getClock()).thenReturn(clock);
@ -65,11 +66,11 @@ public class TestMaxRunningAppsEnforcer {
when(rmContext.getEpoch()).thenReturn(0);
}
private FSSchedulerApp addApp(FSLeafQueue queue, String user) {
private FSAppAttempt addApp(FSLeafQueue queue, String user) {
ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user);
FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null,
FSAppAttempt app = new FSAppAttempt(scheduler, attId, user, queue, null,
rmContext);
queue.addApp(app, runnable);
if (runnable) {
@ -80,7 +81,7 @@ public class TestMaxRunningAppsEnforcer {
return app;
}
private void removeApp(FSSchedulerApp app) {
private void removeApp(FSAppAttempt app) {
app.getQueue().removeApp(app);
maxAppsEnforcer.untrackRunnableApp(app);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(app, app.getQueue());
@ -93,7 +94,7 @@ public class TestMaxRunningAppsEnforcer {
queueMaxApps.put("root", 2);
queueMaxApps.put("root.queue1", 1);
queueMaxApps.put("root.queue2", 1);
FSSchedulerApp app1 = addApp(leaf1, "user");
FSAppAttempt app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
@ -110,7 +111,7 @@ public class TestMaxRunningAppsEnforcer {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
FSAppAttempt app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
assertEquals(1, leaf1.getRunnableAppSchedulables().size());
@ -128,7 +129,7 @@ public class TestMaxRunningAppsEnforcer {
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true);
queueMaxApps.put("root.queue1.leaf1", 2);
userMaxApps.put("user1", 1);
FSSchedulerApp app1 = addApp(leaf1, "user1");
FSAppAttempt app1 = addApp(leaf1, "user1");
addApp(leaf1, "user2");
addApp(leaf1, "user3");
addApp(leaf2, "user1");
@ -147,7 +148,7 @@ public class TestMaxRunningAppsEnforcer {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
FSAppAttempt app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
clock.tick(20);
@ -167,7 +168,7 @@ public class TestMaxRunningAppsEnforcer {
FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
queueMaxApps.put("root.queue1", 2);
FSSchedulerApp app1 = addApp(leaf1, "user");
FSAppAttempt app1 = addApp(leaf1, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
addApp(leaf2, "user");
@ -182,21 +183,18 @@ public class TestMaxRunningAppsEnforcer {
@Test
public void testMultiListStartTimeIteratorEmptyAppLists() {
List<List<AppSchedulable>> lists = new ArrayList<List<AppSchedulable>>();
lists.add(Arrays.asList(mockAppSched(1)));
lists.add(Arrays.asList(mockAppSched(2)));
Iterator<FSSchedulerApp> iter =
List<List<FSAppAttempt>> lists = new ArrayList<List<FSAppAttempt>>();
lists.add(Arrays.asList(mockAppAttempt(1)));
lists.add(Arrays.asList(mockAppAttempt(2)));
Iterator<FSAppAttempt> iter =
new MaxRunningAppsEnforcer.MultiListStartTimeIterator(lists);
assertEquals(1, iter.next().getAppSchedulable().getStartTime());
assertEquals(2, iter.next().getAppSchedulable().getStartTime());
assertEquals(1, iter.next().getStartTime());
assertEquals(2, iter.next().getStartTime());
}
private AppSchedulable mockAppSched(long startTime) {
AppSchedulable appSched = mock(AppSchedulable.class);
when(appSched.getStartTime()).thenReturn(startTime);
FSSchedulerApp schedApp = mock(FSSchedulerApp.class);
when(schedApp.getAppSchedulable()).thenReturn(appSched);
when(appSched.getApp()).thenReturn(schedApp);
return appSched;
private FSAppAttempt mockAppAttempt(long startTime) {
FSAppAttempt schedApp = mock(FSAppAttempt.class);
when(schedApp.getStartTime()).thenReturn(startTime);
return schedApp;
}
}