YARN-2399. Delete old versions of files. FairScheduler: Merge AppSchedulable and FSSchedulerApp into FSAppAttempt. (kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617619 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-08-12 22:51:57 +00:00
parent d687f6f689
commit 4239695588
3 changed files with 0 additions and 1014 deletions

View File

@ -1,463 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@Unstable
public class AppSchedulable extends Schedulable {
private static final DefaultResourceCalculator RESOURCE_CALCULATOR
= new DefaultResourceCalculator();
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
private long startTime;
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager;
private Priority priority;
private ResourceWeights resourceWeights;
private RMContainerComparator comparator = new RMContainerComparator();
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = scheduler.getClock().getTime();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
this.priority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
@Override
public String getName() {
return app.getApplicationId().toString();
}
public FSSchedulerApp getApp() {
return app;
}
public ResourceWeights getResourceWeights() {
return resourceWeights;
}
@Override
public void updateDemand() {
demand = Resources.createResource(0);
// Demand is current consumption plus outstanding requests
Resources.addTo(demand, app.getCurrentConsumption());
// Add up outstanding resource requests
synchronized (app) {
for (Priority p : app.getPriorities()) {
for (ResourceRequest r : app.getResourceRequests(p).values()) {
Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
Resources.addTo(demand, total);
}
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public Resource getResourceUsage() {
// Here the getPreemptedResources() always return zero, except in
// a preemption round
return Resources.subtract(app.getCurrentConsumption(),
app.getPreemptedResources());
}
@Override
public Resource getMinShare() {
return Resources.none();
}
@Override
public Resource getMaxShare() {
return Resources.unbounded();
}
/**
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return queue.getMetrics();
}
@Override
public ResourceWeights getWeights() {
return scheduler.getAppWeight(this);
}
@Override
public Priority getPriority() {
// Right now per-app priorities are not passed to scheduler,
// so everyone has the same priority.
return priority;
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
*/
public Container createContainer(
FSSchedulerApp application, FSSchedulerNode node,
Resource capability, Priority priority) {
NodeId nodeId = node.getRMNode().getNodeID();
ContainerId containerId = BuilderUtils.newContainerId(application
.getApplicationAttemptId(), application.getNewContainerId());
// Create the container
Container container =
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
.getHttpAddress(), capability, priority, null);
return container;
}
/**
* Reserve a spot for {@code container} on this {@code node}. If
* the container is {@code alreadyReserved} on the node, simply
* update relevant bookeeping. This dispatches ro relevant handlers
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
*/
private void reserve(Priority priority, FSSchedulerNode node,
Container container, boolean alreadyReserved) {
LOG.info("Making reservation: node=" + node.getNodeName() +
" app_id=" + app.getApplicationId());
if (!alreadyReserved) {
getMetrics().reserveResource(app.getUser(), container.getResource());
RMContainer rmContainer = app.reserve(node, priority, null,
container);
node.reserveResource(app, priority, rmContainer);
}
else {
RMContainer rmContainer = node.getReservedContainer();
app.reserve(node, priority, rmContainer, container);
node.reserveResource(app, priority, rmContainer);
}
}
/**
* Remove the reservation on {@code node} at the given
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
* handlers for an unreservation.
*/
public void unreserve(Priority priority, FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
app.unreserve(node, priority);
node.unreserveResource(app);
getMetrics().unreserveResource(
app.getUser(), rmContainer.getContainer().getResource());
}
/**
* Assign a container to this node to facilitate {@code request}. If node does
* not have enough memory, create a reservation. This is called once we are
* sure the particular request should be facilitated by this node.
*
* @param node
* The node to try placing the container on.
* @param priority
* The requested priority for the container.
* @param request
* The ResourceRequest we're trying to satisfy.
* @param type
* The locality of the assignment.
* @param reserved
* Whether there's already a container reserved for this app on the node.
* @return
* If an assignment was made, returns the resources allocated to the
* container. If a reservation was made, returns
* FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was
* made, returns an empty resource.
*/
private Resource assignContainer(FSSchedulerNode node,
ResourceRequest request, NodeType type,
boolean reserved) {
// How much does this request need?
Resource capability = request.getCapability();
// How much does the node have?
Resource available = node.getAvailableResource();
Container container = null;
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(app, node, capability, request.getPriority());
}
// Can we allocate a container on this node?
if (Resources.fitsIn(capability, available)) {
// Inform the application of the new container for this request
RMContainer allocatedContainer =
app.allocate(type, node, request.getPriority(), request, container);
if (allocatedContainer == null) {
// Did the application need this resource?
if (reserved) {
unreserve(request.getPriority(), node);
}
return Resources.none();
}
// If we had previously made a reservation, delete it
if (reserved) {
unreserve(request.getPriority(), node);
}
// Inform the node
node.allocateContainer(allocatedContainer);
// If this container is used to run AM, update the leaf queue's AM usage
if (app.getLiveContainers().size() == 1 &&
!app.getUnmanagedAM()) {
queue.addAMResourceUsage(container.getResource());
app.setAmRunning(true);
}
return container.getResource();
} else {
// The desired container won't fit here, so reserve
reserve(request.getPriority(), node, container, reserved);
return FairScheduler.CONTAINER_RESERVED;
}
}
private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
}
Collection<Priority> prioritiesToTry = (reserved) ?
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
app.getPriorities();
// For each priority, see if we can schedule a node local, rack local
// or off-switch request. Rack of off-switch requests may be delayed
// (not scheduled) in order to promote better locality.
synchronized (app) {
for (Priority priority : prioritiesToTry) {
if (app.getTotalRequiredResources(priority) <= 0 ||
!hasContainerForNode(priority, node)) {
continue;
}
app.addSchedulingOpportunity(priority);
// Check the AM resource usage for the leaf queue
if (app.getLiveContainers().size() == 0
&& !app.getUnmanagedAM()) {
if (!queue.canRunAppAM(app.getAMResource())) {
return Resources.none();
}
}
ResourceRequest rackLocalRequest = app.getResourceRequest(priority,
node.getRackName());
ResourceRequest localRequest = app.getResourceRequest(priority,
node.getNodeName());
if (localRequest != null && !localRequest.getRelaxLocality()) {
LOG.warn("Relax locality off is not supported on local request: "
+ localRequest);
}
NodeType allowedLocality;
if (scheduler.isContinuousSchedulingEnabled()) {
allowedLocality = app.getAllowedLocalityLevelByTime(priority,
scheduler.getNodeLocalityDelayMs(),
scheduler.getRackLocalityDelayMs(),
scheduler.getClock().getTime());
} else {
allowedLocality = app.getAllowedLocalityLevel(priority,
scheduler.getNumClusterNodes(),
scheduler.getNodeLocalityThreshold(),
scheduler.getRackLocalityThreshold());
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& localRequest != null && localRequest.getNumContainers() != 0) {
return assignContainer(node, localRequest,
NodeType.NODE_LOCAL, reserved);
}
if (rackLocalRequest != null && !rackLocalRequest.getRelaxLocality()) {
continue;
}
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
allowedLocality.equals(NodeType.OFF_SWITCH))) {
return assignContainer(node, rackLocalRequest,
NodeType.RACK_LOCAL, reserved);
}
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
ResourceRequest.ANY);
if (offSwitchRequest != null && !offSwitchRequest.getRelaxLocality()) {
continue;
}
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, offSwitchRequest,
NodeType.OFF_SWITCH, reserved);
}
}
}
return Resources.none();
}
/**
* Called when this application already has an existing reservation on the
* given node. Sees whether we can turn the reservation into an allocation.
* Also checks whether the application needs the reservation anymore, and
* releases it if not.
*
* @param node
* Node that the application has an existing reservation on
*/
public Resource assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
Priority priority = rmContainer.getReservedPriority();
// Make sure the application still needs requests at this priority
if (app.getTotalRequiredResources(priority) == 0) {
unreserve(priority, node);
return Resources.none();
}
// Fail early if the reserved container won't fit.
// Note that we have an assumption here that there's only one container size
// per priority.
if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
node.getAvailableResource())) {
return Resources.none();
}
return assignContainer(node, true);
}
@Override
public Resource assignContainer(FSSchedulerNode node) {
return assignContainer(node, false);
}
/**
* Preempt a running container according to the priority
*/
@Override
public RMContainer preemptContainer() {
if (LOG.isDebugEnabled()) {
LOG.debug("App " + getName() + " is going to preempt a running " +
"container");
}
RMContainer toBePreempted = null;
for (RMContainer container : app.getLiveContainers()) {
if (! app.getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
}
/**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
*/
public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
ResourceRequest anyRequest = app.getResourceRequest(prio, ResourceRequest.ANY);
ResourceRequest rackRequest = app.getResourceRequest(prio, node.getRackName());
ResourceRequest nodeRequest = app.getResourceRequest(prio, node.getNodeName());
return
// There must be outstanding requests at the given priority:
anyRequest != null && anyRequest.getNumContainers() > 0 &&
// If locality relaxation is turned off at *-level, there must be a
// non-zero request for the node's rack:
(anyRequest.getRelaxLocality() ||
(rackRequest != null && rackRequest.getNumContainers() > 0)) &&
// If locality relaxation is turned off at rack-level, there must be a
// non-zero request at the node:
(rackRequest == null || rackRequest.getRelaxLocality() ||
(nodeRequest != null && nodeRequest.getNumContainers() > 0)) &&
// The requested container must be able to fit on the node:
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
}
}

View File

@ -1,360 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Represents an application attempt from the viewpoint of the Fair Scheduler.
*/
@Private
@Unstable
public class FSSchedulerApp extends SchedulerApplicationAttempt {
private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class);
private AppSchedulable appSchedulable;
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
private Resource preemptedResources = Resources.createResource(0);
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
public void setAppSchedulable(AppSchedulable appSchedulable) {
this.appSchedulable = appSchedulable;
}
public AppSchedulable getAppSchedulable() {
return appSchedulable;
}
synchronized public void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Remove from the list of newly allocated containers if found
newlyAllocatedContainers.remove(rmContainer);
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
RMAuditLogger.logSuccess(getUser(),
AuditConstants.RELEASE_CONTAINER, "SchedulerApp",
getApplicationId(), containerId);
// Update usage metrics
Resource containerResource = rmContainer.getContainer().getResource();
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
Resources.subtractFrom(currentConsumption, containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
}
public synchronized void unreserve(FSSchedulerNode node, Priority priority) {
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
if (reservedContainers.isEmpty()) {
this.reservedContainers.remove(priority);
}
// Reset the re-reservation count
resetReReservations(priority);
Resource resource = reservedContainer.getContainer().getResource();
Resources.subtractFrom(currentReservation, resource);
LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
+ node + ", currently has " + reservedContainers.size() + " at priority "
+ priority + "; currentReservation " + currentReservation);
}
public synchronized float getLocalityWaitFactor(
Priority priority, int clusterNodes) {
// Estimate: Required unique resources (i.e. hosts + racks)
int requiredResources =
Math.max(this.getResourceRequests(priority).size() - 1, 0);
// waitFactor can't be more than '1'
// i.e. no point skipping more than clustersize opportunities
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
}
/**
* Delay scheduling: We often want to prioritize scheduling of node-local
* containers over rack-local or off-switch containers. To acheive this
* we first only allow node-local assigments for a given prioirty level,
* then relax the locality threshold once we've had a long enough period
* without succesfully scheduling. We measure both the number of "missed"
* scheduling opportunities since the last container was scheduled
* at the current allowed level and the time since the last container
* was scheduled. Currently we use only the former.
*/
// Current locality threshold
final Map<Priority, NodeType> allowedLocalityLevel = new HashMap<
Priority, NodeType>();
/**
* Return the level at which we are allowed to schedule containers, given the
* current size of the cluster and thresholds indicating how many nodes to
* fail at (as a fraction of cluster size) before relaxing scheduling
* constraints.
*/
public synchronized NodeType getAllowedLocalityLevel(Priority priority,
int numNodes, double nodeLocalityThreshold, double rackLocalityThreshold) {
// upper limit on threshold
if (nodeLocalityThreshold > 1.0) { nodeLocalityThreshold = 1.0; }
if (rackLocalityThreshold > 1.0) { rackLocalityThreshold = 1.0; }
// If delay scheduling is not being used, can schedule anywhere
if (nodeLocalityThreshold < 0.0 || rackLocalityThreshold < 0.0) {
return NodeType.OFF_SWITCH;
}
// Default level is NODE_LOCAL
if (!allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// If level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) return NodeType.OFF_SWITCH;
double threshold = allowed.equals(NodeType.NODE_LOCAL) ? nodeLocalityThreshold :
rackLocalityThreshold;
// Relax locality constraints once we've surpassed threshold.
if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority);
}
else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority);
}
}
return allowedLocalityLevel.get(priority);
}
/**
* Return the level at which we are allowed to schedule containers.
* Given the thresholds indicating how much time passed before relaxing
* scheduling constraints.
*/
public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority,
long nodeLocalityDelayMs, long rackLocalityDelayMs,
long currentTimeMs) {
// if not being used, can schedule anywhere
if (nodeLocalityDelayMs < 0 || rackLocalityDelayMs < 0) {
return NodeType.OFF_SWITCH;
}
// default level is NODE_LOCAL
if (! allowedLocalityLevel.containsKey(priority)) {
allowedLocalityLevel.put(priority, NodeType.NODE_LOCAL);
return NodeType.NODE_LOCAL;
}
NodeType allowed = allowedLocalityLevel.get(priority);
// if level is already most liberal, we're done
if (allowed.equals(NodeType.OFF_SWITCH)) {
return NodeType.OFF_SWITCH;
}
// check waiting time
long waitTime = currentTimeMs;
if (lastScheduledContainer.containsKey(priority)) {
waitTime -= lastScheduledContainer.get(priority);
} else {
waitTime -= appSchedulable.getStartTime();
}
long thresholdTime = allowed.equals(NodeType.NODE_LOCAL) ?
nodeLocalityDelayMs : rackLocalityDelayMs;
if (waitTime > thresholdTime) {
if (allowed.equals(NodeType.NODE_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
resetSchedulingOpportunities(priority, currentTimeMs);
} else if (allowed.equals(NodeType.RACK_LOCAL)) {
allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
resetSchedulingOpportunities(priority, currentTimeMs);
}
}
return allowedLocalityLevel.get(priority);
}
synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
// Update allowed locality level
NodeType allowed = allowedLocalityLevel.get(priority);
if (allowed != null) {
if (allowed.equals(NodeType.OFF_SWITCH) &&
(type.equals(NodeType.NODE_LOCAL) ||
type.equals(NodeType.RACK_LOCAL))) {
this.resetAllowedLocalityLevel(priority, type);
}
else if (allowed.equals(NodeType.RACK_LOCAL) &&
type.equals(NodeType.NODE_LOCAL)) {
this.resetAllowedLocalityLevel(priority, type);
}
}
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(priority) <= 0) {
return null;
}
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId()
+ " container=" + container.getId() + " host="
+ container.getNodeId().getHost() + " type=" + type);
}
RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId());
return rmContainer;
}
/**
* Should be called when the scheduler assigns a container at a higher
* degree of locality than the current threshold. Reset the allowed locality
* level to a higher degree of locality.
*/
public synchronized void resetAllowedLocalityLevel(Priority priority,
NodeType level) {
NodeType old = allowedLocalityLevel.get(priority);
LOG.info("Raising locality level from " + old + " to " + level + " at " +
" priority " + priority);
allowedLocalityLevel.put(priority, level);
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
public Resource getPreemptedResources() {
return preemptedResources;
}
public void resetPreemptedResources() {
preemptedResources = Resources.createResource(0);
for (RMContainer container : getPreemptionContainers()) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
public void clearPreemptedResources() {
preemptedResources.setMemory(0);
preemptedResources.setVirtualCores(0);
}
}

View File

@ -1,191 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSSchedulerApp {
private class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
return time;
}
public void tick(int seconds) {
time = time + seconds * 1000;
}
}
private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
ApplicationAttemptId attId =
ApplicationAttemptId.newInstance(appIdImpl, attemptId);
return attId;
}
@Test
public void testDelayScheduling() {
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
double nodeLocalityThreshold = .5;
double rackLocalityThreshold = .6;
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null,
rmContext);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// First five scheduling opportunities should remain node local
for (int i = 0; i < 5; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
// After five it should switch to rack local
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// Manually set back to node local
schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
schedulerApp.resetSchedulingOpportunities(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
// Now escalate again to rack-local, then to off-switch
for (int i = 0; i < 5; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
for (int i = 0; i < 6; i++) {
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
schedulerApp.addSchedulingOpportunity(prio);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
}
@Test
public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException {
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
MockClock clock = new MockClock();
long nodeLocalityDelayMs = 5 * 1000L; // 5 seconds
long rackLocalityDelayMs = 6 * 1000L; // 6 seconds
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue,
null, rmContext);
AppSchedulable appSchedulable = Mockito.mock(AppSchedulable.class);
long startTime = clock.getTime();
Mockito.when(appSchedulable.getStartTime()).thenReturn(startTime);
schedulerApp.setAppSchedulable(appSchedulable);
// Default level should be node-local
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 4 seconds should remain node local
clock.tick(4);
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// after 6 seconds should switch to rack local
clock.tick(2);
assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// manually set back to node local
schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
schedulerApp.resetSchedulingOpportunities(prio, clock.getTime());
assertEquals(NodeType.NODE_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
// Now escalate again to rack-local, then to off-switch
clock.tick(6);
assertEquals(NodeType.RACK_LOCAL,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
clock.tick(7);
assertEquals(NodeType.OFF_SWITCH,
schedulerApp.getAllowedLocalityLevelByTime(prio,
nodeLocalityDelayMs, rackLocalityDelayMs, clock.getTime()));
}
@Test
/**
* Ensure that when negative paramaters are given (signaling delay scheduling
* no tin use), the least restrictive locality level is returned.
*/
public void testLocalityLevelWithoutDelays() {
FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
RMContext rmContext = Mockito.mock(RMContext.class);
Mockito.when(rmContext.getEpoch()).thenReturn(0);
ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, "user1", queue , null,
rmContext);
assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
prio, 10, -1.0, -1.0));
}
}