YARN-4752. Improved preemption in FairScheduler. (kasha)

This commit is contained in:
Karthik Kambatla 2017-01-25 10:45:02 -08:00
parent 558fee285a
commit 0ceb7149d9
22 changed files with 1499 additions and 2124 deletions

View File

@ -143,6 +143,10 @@ public static Resource createResource(long memory, int cores) {
public static Resource none() {
return NONE;
}
public static boolean isNone(Resource other) {
return NONE.equals(other);
}
public static Resource unbounded() {
return UNBOUNDED;

View File

@ -613,6 +613,26 @@ public Resource getResource(SchedulerRequestKey schedulerKey) {
}
}
/**
* Method to return the next resource request to be serviced.
*
* In the initial implementation, we just pick any {@link ResourceRequest}
* corresponding to the highest priority.
*
* @return next {@link ResourceRequest} to allocate resources for.
*/
@Unstable
public synchronized ResourceRequest getNextResourceRequest() {
SchedulingPlacementSet<SchedulerNode> ps = schedulerKeyToPlacementSets.get(
schedulerKeys.firstKey());
if (null != ps) {
for (ResourceRequest rr : ps.getResourceRequests().values()) {
return rr;
}
}
return null;
}
/**
* Returns if the place (node/rack today) is either blacklisted by the
* application (user) or the system

View File

@ -1262,6 +1262,22 @@ public void decUnconfirmedRes(Resource res) {
unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores());
}
@Override
public int hashCode() {
return getApplicationAttemptId().hashCode();
}
@Override
public boolean equals(Object o) {
if (! (o instanceof SchedulerApplicationAttempt)) {
return false;
}
SchedulerApplicationAttempt other = (SchedulerApplicationAttempt) o;
return (this == other ||
this.getApplicationAttemptId().equals(other.getApplicationAttemptId()));
}
/**
* Different state for Application Master, user can see this state from web UI
*/

View File

@ -1184,4 +1184,20 @@ public boolean moveReservation(RMContainer reservedContainer,
writeLock.unlock();
}
}
/*
* Overriding to appease findbugs
*/
@Override
public int hashCode() {
return super.hashCode();
}
/*
* Overriding to appease findbugs
*/
@Override
public boolean equals(Object o) {
return super.equals(o);
}
}

View File

@ -18,18 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -53,6 +52,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@ -78,10 +78,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private ResourceWeights resourceWeights;
private Resource demand = Resources.createResource(0);
private FairScheduler scheduler;
private FSQueue fsQueue;
private Resource fairShare = Resources.createResource(0, 0);
private Resource preemptedResources = Resources.createResource(0);
private RMContainerComparator comparator = new RMContainerComparator();
private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
// Preemption related variables
private final Resource preemptedResources = Resources.clone(Resources.none());
private final Set<RMContainer> containersToPreempt = new HashSet<>();
private Resource fairshareStarvation = Resources.none();
private long lastTimeAtFairShare;
// minShareStarvation attributed to this application by the leaf queue
private Resource minshareStarvation = Resources.none();
// Used to record node reservation by an app.
// Key = RackName, Value = Set of Nodes reserved by app on rack
@ -107,12 +114,14 @@ public FSAppAttempt(FairScheduler scheduler,
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
this.scheduler = scheduler;
this.fsQueue = queue;
this.startTime = scheduler.getClock().getTime();
this.lastTimeAtFairShare = this.startTime;
this.appPriority = Priority.newInstance(1);
this.resourceWeights = new ResourceWeights();
}
public ResourceWeights getResourceWeights() {
ResourceWeights getResourceWeights() {
return resourceWeights;
}
@ -123,7 +132,7 @@ public QueueMetrics getMetrics() {
return queue.getMetrics();
}
public void containerCompleted(RMContainer rmContainer,
void containerCompleted(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
try {
writeLock.lock();
@ -143,6 +152,7 @@ public void containerCompleted(RMContainer rmContainer,
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());
untrackContainerForPreemption(rmContainer);
Resource containerResource = rmContainer.getContainer().getResource();
RMAuditLogger.logSuccess(getUser(), AuditConstants.RELEASE_CONTAINER,
@ -152,9 +162,6 @@ public void containerCompleted(RMContainer rmContainer,
queue.getMetrics().releaseResources(getUser(), 1, containerResource);
this.attemptResourceUsage.decUsed(containerResource);
// remove from preemption map if it is completed
preemptionMap.remove(rmContainer);
// Clear resource utilization metrics cache.
lastMemoryAggregateAllocationUpdateTime = -1;
} finally {
@ -484,7 +491,7 @@ public RMContainer allocate(NodeType type, FSSchedulerNode node,
* @param schedulerKey Scheduler Key
* @param level NodeType
*/
public void resetAllowedLocalityLevel(
void resetAllowedLocalityLevel(
SchedulerRequestKey schedulerKey, NodeType level) {
NodeType old;
try {
@ -498,57 +505,113 @@ public void resetAllowedLocalityLevel(
+ " priority " + schedulerKey.getPriority());
}
// related methods
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
try {
writeLock.lock();
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
} finally {
writeLock.unlock();
}
}
public Long getContainerPreemptionTime(RMContainer container) {
return preemptionMap.get(container);
}
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
@Override
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
Queue queue = super.getQueue();
assert queue instanceof FSLeafQueue;
return (FSLeafQueue) queue;
}
public Resource getPreemptedResources() {
return preemptedResources;
// Preemption related methods
/**
* Get overall starvation - fairshare and attributed minshare.
*
* @return total starvation attributed to this application
*/
Resource getStarvation() {
return Resources.add(fairshareStarvation, minshareStarvation);
}
public void resetPreemptedResources() {
preemptedResources = Resources.createResource(0);
for (RMContainer container : getPreemptionContainers()) {
/**
* Set the minshare attributed to this application. To be called only from
* {@link FSLeafQueue#updateStarvedApps}.
*
* @param starvation minshare starvation attributed to this app
*/
void setMinshareStarvation(Resource starvation) {
this.minshareStarvation = starvation;
}
/**
* Reset the minshare starvation attributed to this application. To be
* called only from {@link FSLeafQueue#updateStarvedApps}
*/
void resetMinshareStarvation() {
this.minshareStarvation = Resources.none();
}
void trackContainerForPreemption(RMContainer container) {
containersToPreempt.add(container);
synchronized (preemptedResources) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
public void clearPreemptedResources() {
preemptedResources.setMemorySize(0);
preemptedResources.setVirtualCores(0);
private void untrackContainerForPreemption(RMContainer container) {
synchronized (preemptedResources) {
Resources.subtractFrom(preemptedResources,
container.getAllocatedResource());
}
containersToPreempt.remove(container);
}
Set<RMContainer> getPreemptionContainers() {
return containersToPreempt;
}
private Resource getPreemptedResources() {
synchronized (preemptedResources) {
return preemptedResources;
}
}
boolean canContainerBePreempted(RMContainer container) {
// Sanity check that the app owns this container
if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
!newlyAllocatedContainers.contains(container)) {
LOG.error("Looking to preempt container " + container +
". Container does not belong to app " + getApplicationId());
return false;
}
if (containersToPreempt.contains(container)) {
// The container is already under consideration for preemption
return false;
}
// Check if any of the parent queues are not preemptable
// TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
// the app to avoid recursing up every time.
for (FSQueue q = getQueue();
!q.getQueueName().equals("root");
q = q.getParent()) {
if (!q.isPreemptable()) {
return false;
}
}
// Check if the app's allocation will be over its fairshare even
// after preempting this container
Resource currentUsage = getResourceUsage();
Resource fairshare = getFairShare();
Resource overFairShareBy = Resources.subtract(currentUsage, fairshare);
return (Resources.fitsIn(container.getAllocatedResource(),
overFairShareBy));
}
/**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* given application on the given node with the given capability and
* priority.
*
* @param node Node
* @param capability Capability
* @param schedulerKey Scheduler Key
* @return Container
*/
public Container createContainer(FSSchedulerNode node, Resource capability,
private Container createContainer(FSSchedulerNode node, Resource capability,
SchedulerRequestKey schedulerKey) {
NodeId nodeId = node.getRMNode().getNodeID();
@ -556,12 +619,10 @@ public Container createContainer(FSSchedulerNode node, Resource capability,
getApplicationAttemptId(), getNewContainerId());
// Create the container
Container container = BuilderUtils.newContainer(containerId, nodeId,
return BuilderUtils.newContainer(containerId, nodeId,
node.getRMNode().getHttpAddress(), capability,
schedulerKey.getPriority(), null,
schedulerKey.getAllocationRequestId());
return container;
}
/**
@ -816,7 +877,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
}
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) :
Collections.singletonList(
node.getReservedContainer().getReservedSchedulerKey()) :
getSchedulerKeys();
// For each priority, see if we can schedule a node local, rack local
@ -974,7 +1036,7 @@ private boolean isValidReservation(FSSchedulerNode node) {
* Node that the application has an existing reservation on
* @return whether the reservation on the given node is valid.
*/
public boolean assignReservedContainer(FSSchedulerNode node) {
boolean assignReservedContainer(FSSchedulerNode node) {
RMContainer rmContainer = node.getReservedContainer();
SchedulerRequestKey reservedSchedulerKey =
rmContainer.getReservedSchedulerKey();
@ -1003,17 +1065,43 @@ public boolean assignReservedContainer(FSSchedulerNode node) {
return true;
}
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
/**
* Helper method that computes the extent of fairshare fairshareStarvation.
*/
Resource fairShareStarvation() {
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
long now = scheduler.getClock().getTime();
boolean starved = Resources.greaterThan(
fsQueue.getPolicy().getResourceCalculator(),
scheduler.getClusterResource(), starvation, Resources.none());
if (!starved) {
lastTimeAtFairShare = now;
}
if (starved &&
(now - lastTimeAtFairShare > fsQueue.getFairSharePreemptionTimeout())) {
this.fairshareStarvation = starvation;
} else {
this.fairshareStarvation = Resources.none();
}
return this.fairshareStarvation;
}
ResourceRequest getNextResourceRequest() {
return appSchedulingInfo.getNextResourceRequest();
}
/**
* Helper method that captures if this app is identified to be starved.
* @return true if the app is starved for fairshare, false otherwise
*/
@VisibleForTesting
boolean isStarvedForFairShare() {
return !Resources.isNone(fairshareStarvation);
}
/* Schedulable methods implementation */
@ -1045,14 +1133,13 @@ public Resource getMaxShare() {
@Override
public Resource getResourceUsage() {
// Here the getPreemptedResources() always return zero, except in
// a preemption round
// In the common case where preempted resource is zero, return the
// current consumption Resource object directly without calling
// Resources.subtract which creates a new Resource object for each call.
return getPreemptedResources().equals(Resources.none()) ?
getCurrentConsumption() :
Resources.subtract(getCurrentConsumption(), getPreemptedResources());
/*
* getResourcesToPreempt() returns zero, except when there are containers
* to preempt. Avoid creating an object in the common case.
*/
return getPreemptedResources().equals(Resources.none())
? getCurrentConsumption()
: Resources.subtract(getCurrentConsumption(), getPreemptedResources());
}
@Override
@ -1131,24 +1218,19 @@ private void updateAMDiagnosticMsg(Resource resource, String reason) {
diagnosticMessageBldr.toString());
}
/**
* Preempt a running container according to the priority
/*
* Overriding to appease findbugs
*/
@Override
public RMContainer preemptContainer() {
if (LOG.isDebugEnabled()) {
LOG.debug("App " + getName() + " is going to preempt a running " +
"container");
}
public int hashCode() {
return super.hashCode();
}
RMContainer toBePreempted = null;
for (RMContainer container : getLiveContainers()) {
if (!getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
/*
* Overriding to appease findbugs
*/
@Override
public boolean equals(Object o) {
return super.equals(o);
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
/**
* Helper class that holds basic information to be passed around
* FairScheduler classes. Think of this as a glorified map that holds key
* information about the scheduler.
*/
public class FSContext {
// Preemption-related info
private boolean preemptionEnabled = false;
private float preemptionUtilizationThreshold;
private FSStarvedApps starvedApps;
public boolean isPreemptionEnabled() {
return preemptionEnabled;
}
public void setPreemptionEnabled() {
this.preemptionEnabled = true;
if (starvedApps == null) {
starvedApps = new FSStarvedApps();
}
}
public FSStarvedApps getStarvedApps() {
return starvedApps;
}
public float getPreemptionUtilizationThreshold() {
return preemptionUtilizationThreshold;
}
public void setPreemptionUtilizationThreshold(
float preemptionUtilizationThreshold) {
this.preemptionUtilizationThreshold = preemptionUtilizationThreshold;
}
}

View File

@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -45,16 +44,20 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import static org.apache.hadoop.yarn.util.resource.Resources.none;
@Private
@Unstable
public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
private static final Log LOG = LogFactory.getLog(FSLeafQueue.class.getName());
private static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
private final List<FSAppAttempt> runnableApps = // apps that are runnable
new ArrayList<FSAppAttempt>();
private final List<FSAppAttempt> nonRunnableApps =
new ArrayList<FSAppAttempt>();
private FairScheduler scheduler;
private FSContext context;
// apps that are runnable
private final List<FSAppAttempt> runnableApps = new ArrayList<>();
private final List<FSAppAttempt> nonRunnableApps = new ArrayList<>();
// get a lock with fair distribution for app list updates
private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
private final Lock readLock = rwl.readLock();
@ -64,25 +67,24 @@ public class FSLeafQueue extends FSQueue {
// Variables used for preemption
private long lastTimeAtMinShare;
private long lastTimeAtFairShareThreshold;
// Track the AM resource usage for this queue
private Resource amResourceUsage;
private final ActiveUsersManager activeUsersManager;
public static final List<FSQueue> EMPTY_LIST = Collections.emptyList();
public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
super(name, scheduler, parent);
this.scheduler = scheduler;
this.context = scheduler.getContext();
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
amResourceUsage = Resource.newInstance(0, 0);
getMetrics().setAMResourceUsage(amResourceUsage);
}
public void addApp(FSAppAttempt app, boolean runnable) {
void addApp(FSAppAttempt app, boolean runnable) {
writeLock.lock();
try {
if (runnable) {
@ -109,7 +111,7 @@ void addAppSchedulable(FSAppAttempt appSched) {
* Removes the given app from this queue.
* @return whether or not the app was runnable
*/
public boolean removeApp(FSAppAttempt app) {
boolean removeApp(FSAppAttempt app) {
boolean runnable = false;
// Remove app from runnable/nonRunnable list while holding the write lock
@ -141,7 +143,7 @@ public boolean removeApp(FSAppAttempt app) {
* Removes the given app if it is non-runnable and belongs to this queue
* @return true if the app is removed, false otherwise
*/
public boolean removeNonRunnableApp(FSAppAttempt app) {
boolean removeNonRunnableApp(FSAppAttempt app) {
writeLock.lock();
try {
return nonRunnableApps.remove(app);
@ -150,7 +152,7 @@ public boolean removeNonRunnableApp(FSAppAttempt app) {
}
}
public boolean isRunnableApp(FSAppAttempt attempt) {
boolean isRunnableApp(FSAppAttempt attempt) {
readLock.lock();
try {
return runnableApps.contains(attempt);
@ -159,7 +161,7 @@ public boolean isRunnableApp(FSAppAttempt attempt) {
}
}
public boolean isNonRunnableApp(FSAppAttempt attempt) {
boolean isNonRunnableApp(FSAppAttempt attempt) {
readLock.lock();
try {
return nonRunnableApps.contains(attempt);
@ -168,30 +170,8 @@ public boolean isNonRunnableApp(FSAppAttempt attempt) {
}
}
public void resetPreemptedResources() {
readLock.lock();
try {
for (FSAppAttempt attempt : runnableApps) {
attempt.resetPreemptedResources();
}
} finally {
readLock.unlock();
}
}
public void clearPreemptedResources() {
readLock.lock();
try {
for (FSAppAttempt attempt : runnableApps) {
attempt.clearPreemptedResources();
}
} finally {
readLock.unlock();
}
}
public List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
List<FSAppAttempt> appsToReturn = new ArrayList<FSAppAttempt>();
List<FSAppAttempt> getCopyOfNonRunnableAppSchedulables() {
List<FSAppAttempt> appsToReturn = new ArrayList<>();
readLock.lock();
try {
appsToReturn.addAll(nonRunnableApps);
@ -225,17 +205,78 @@ public void setPolicy(SchedulingPolicy policy)
}
super.policy = policy;
}
@Override
public void recomputeShares() {
public void updateInternal(boolean checkStarvation) {
readLock.lock();
try {
policy.computeShares(runnableApps, getFairShare());
if (checkStarvation) {
updateStarvedApps();
}
} finally {
readLock.unlock();
}
}
/**
* Helper method to identify starved applications. This needs to be called
* ONLY from {@link #updateInternal}, after the application shares
* are updated.
*
* A queue can be starving due to fairshare or minshare.
*
* Minshare is defined only on the queue and not the applications.
* Fairshare is defined for both the queue and the applications.
*
* If this queue is starved due to minshare, we need to identify the most
* deserving apps if they themselves are not starved due to fairshare.
*
* If this queue is starving due to fairshare, there must be at least
* one application that is starved. And, even if the queue is not
* starved due to fairshare, there might still be starved applications.
*/
private void updateStarvedApps() {
// First identify starved applications and track total amount of
// starvation (in resources)
Resource fairShareStarvation = Resources.clone(none());
// Fetch apps with unmet demand sorted by fairshare starvation
TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
for (FSAppAttempt app : appsWithDemand) {
Resource appStarvation = app.fairShareStarvation();
if (!Resources.equals(Resources.none(), appStarvation)) {
context.getStarvedApps().addStarvedApp(app);
Resources.addTo(fairShareStarvation, appStarvation);
} else {
break;
}
}
// Compute extent of minshare starvation
Resource minShareStarvation = minShareStarvation();
// Compute minshare starvation that is not subsumed by fairshare starvation
Resources.subtractFrom(minShareStarvation, fairShareStarvation);
// Keep adding apps to the starved list until the unmet demand goes over
// the remaining minshare
for (FSAppAttempt app : appsWithDemand) {
if (Resources.greaterThan(policy.getResourceCalculator(),
scheduler.getClusterResource(), minShareStarvation, none())) {
Resource appPendingDemand =
Resources.subtract(app.getDemand(), app.getResourceUsage());
Resources.subtractFrom(minShareStarvation, appPendingDemand);
app.setMinshareStarvation(appPendingDemand);
context.getStarvedApps().addStarvedApp(app);
} else {
// Reset minshare starvation in case we had set it in a previous
// iteration
app.resetMinshareStarvation();
}
}
}
@Override
public Resource getDemand() {
return demand;
@ -258,7 +299,7 @@ public Resource getResourceUsage() {
return usage;
}
public Resource getAmResourceUsage() {
Resource getAmResourceUsage() {
return amResourceUsage;
}
@ -301,7 +342,7 @@ private void updateDemandForApp(FSAppAttempt sched) {
@Override
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
Resource assigned = none();
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
getName() + " fairShare: " + getFairShare());
@ -311,26 +352,12 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
// Apps that have resource demands.
TreeSet<FSAppAttempt> pendingForResourceApps =
new TreeSet<FSAppAttempt>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resource pending = app.getAppAttemptResourceUsage().getPending();
if (!pending.equals(Resources.none())) {
pendingForResourceApps.add(app);
}
}
} finally {
readLock.unlock();
}
for (FSAppAttempt sched : pendingForResourceApps) {
for (FSAppAttempt sched : fetchAppsWithDemand()) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
assigned = sched.assignContainer(node);
if (!assigned.equals(Resources.none())) {
if (!assigned.equals(none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container in queue:" + getName() + " " +
"container:" + assigned);
@ -341,40 +368,21 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
FSAppAttempt candidateSched = null;
private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
TreeSet<FSAppAttempt> pendingForResourceApps =
new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt sched : runnableApps) {
if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched;
for (FSAppAttempt app : runnableApps) {
Resource pending = app.getAppAttemptResourceUsage().getPending();
if (!pending.equals(none())) {
pendingForResourceApps.add(app);
}
}
} finally {
readLock.unlock();
}
// Preempt from the selected app
if (candidateSched != null) {
toBePreempted = candidateSched.preemptContainer();
}
return toBePreempted;
return pendingForResourceApps;
}
@Override
@ -386,7 +394,7 @@ public List<FSQueue> getChildQueues() {
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
List<QueueACL> operations = new ArrayList<>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
@ -398,23 +406,10 @@ public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
return Collections.singletonList(userAclInfo);
}
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtFairShareThreshold() {
return lastTimeAtFairShareThreshold;
}
private void setLastTimeAtFairShareThreshold(
long lastTimeAtFairShareThreshold) {
this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
}
@Override
public int getNumRunnableApps() {
readLock.lock();
@ -425,7 +420,7 @@ public int getNumRunnableApps() {
}
}
public int getNumNonRunnableApps() {
int getNumNonRunnableApps() {
readLock.lock();
try {
return nonRunnableApps.size();
@ -483,6 +478,8 @@ public ActiveUsersManager getActiveUsersManager() {
* @return the maximum resource AM can use
*/
private Resource computeMaxAMResource() {
// If FairShare is zero, use min(maxShare, available resource) to compute
// maxAMResource
Resource maxResource = Resources.clone(getFairShare());
if (maxResource.getMemorySize() == 0) {
maxResource.setMemorySize(
@ -517,7 +514,7 @@ public boolean canRunAppAM(Resource amResource) {
return Resources.fitsIn(ifRunAMResource, maxAMResource);
}
public void addAMResourceUsage(Resource amResource) {
void addAMResourceUsage(Resource amResource) {
if (amResource != null) {
Resources.addTo(amResourceUsage, amResource);
getMetrics().setAMResourceUsage(amResourceUsage);
@ -531,21 +528,8 @@ public void recoverContainer(Resource clusterResource,
}
/**
* Update the preemption fields for the queue, i.e. the times since last was
* at its guaranteed share and over its fair share threshold.
*/
public void updateStarvationStats() {
long now = scheduler.getClock().getTime();
if (!isStarvedForMinShare()) {
setLastTimeAtMinShare(now);
}
if (!isStarvedForFairShare()) {
setLastTimeAtFairShareThreshold(now);
}
}
/** Allows setting weight for a dynamically created queue
* Currently only used for reservation based queues
* Allows setting weight for a dynamically created queue.
* Currently only used for reservation based queues.
* @param weight queue weight
*/
public void setWeights(float weight) {
@ -553,37 +537,61 @@ public void setWeights(float weight) {
}
/**
* Helper method to check if the queue should preempt containers
* Helper method to compute the amount of minshare starvation.
*
* @return true if check passes (can preempt) or false otherwise
* @return the extent of minshare starvation
*/
private boolean preemptContainerPreCheck() {
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
getFairShare());
}
/**
* Is a queue being starved for its min share.
*/
@VisibleForTesting
boolean isStarvedForMinShare() {
return isStarved(getMinShare());
}
/**
* Is a queue being starved for its fair share threshold.
*/
@VisibleForTesting
boolean isStarvedForFairShare() {
return isStarved(
Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
}
private boolean isStarved(Resource share) {
private Resource minShareStarvation() {
// If demand < minshare, we should use demand to determine starvation
Resource desiredShare = Resources.min(policy.getResourceCalculator(),
scheduler.getClusterResource(), share, getDemand());
Resource resourceUsage = getResourceUsage();
return Resources.lessThan(policy.getResourceCalculator(),
scheduler.getClusterResource(), resourceUsage, desiredShare);
scheduler.getClusterResource(), getMinShare(), getDemand());
Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
boolean starved = !Resources.isNone(starvation);
long now = scheduler.getClock().getTime();
if (!starved) {
// Record that the queue is not starved
setLastTimeAtMinShare(now);
}
if (now - lastTimeAtMinShare < getMinSharePreemptionTimeout()) {
// the queue is not starved for the preemption timeout
starvation = Resources.clone(Resources.none());
}
return starvation;
}
/**
* Helper method for tests to check if a queue is starved for minShare.
* @return whether starved for minshare
*/
@VisibleForTesting
private boolean isStarvedForMinShare() {
return !Resources.isNone(minShareStarvation());
}
/**
* Helper method for tests to check if a queue is starved for fairshare.
* @return whether starved for fairshare
*/
@VisibleForTesting
private boolean isStarvedForFairShare() {
for (FSAppAttempt app : runnableApps) {
if (app.isStarvedForFairShare()) {
return true;
}
}
return false;
}
/**
* Helper method for tests to check if a queue is starved.
* @return whether starved for either minshare or fairshare
*/
@VisibleForTesting
boolean isStarved() {
return isStarvedForMinShare() || isStarvedForFairShare();
}
}

View File

@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@ -61,7 +60,7 @@ public FSParentQueue(String name, FairScheduler scheduler,
super(name, scheduler, parent);
}
public void addChildQueue(FSQueue child) {
void addChildQueue(FSQueue child) {
writeLock.lock();
try {
childQueues.add(child);
@ -70,7 +69,7 @@ public void addChildQueue(FSQueue child) {
}
}
public void removeChildQueue(FSQueue child) {
void removeChildQueue(FSQueue child) {
writeLock.lock();
try {
childQueues.remove(child);
@ -80,20 +79,20 @@ public void removeChildQueue(FSQueue child) {
}
@Override
public void recomputeShares() {
public void updateInternal(boolean checkStarvation) {
readLock.lock();
try {
policy.computeShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setFairShare(childQueue.getFairShare());
childQueue.recomputeShares();
childQueue.updateInternal(checkStarvation);
}
} finally {
readLock.unlock();
}
}
public void recomputeSteadyShares() {
void recomputeSteadyShares() {
readLock.lock();
try {
policy.computeSteadyShares(childQueues, getSteadyFairShare());
@ -188,7 +187,7 @@ private QueueUserACLInfo getUserAclInfo(UserGroupInformation user) {
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
List<QueueUserACLInfo> userAcls = new ArrayList<>();
// Add queue acls
userAcls.add(getUserAclInfo(user));
@ -245,39 +244,6 @@ public Resource assignContainer(FSSchedulerNode node) {
return assigned;
}
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
Comparator<Schedulable> comparator = policy.getComparator();
readLock.lock();
try {
for (FSQueue queue : childQueues) {
// Skip selection for non-preemptable queue
if (!queue.isPreemptable()) {
if (LOG.isDebugEnabled()) {
LOG.debug("skipping from queue=" + getName()
+ " because it's a non-preemptable queue");
}
} else if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
}
}
} finally {
readLock.unlock();
}
// Let the selected queue choose which of its container to preempt
if (candidateQueue != null) {
toBePreempted = candidateQueue.preemptContainer();
}
return toBePreempted;
}
@Override
public List<FSQueue> getChildQueues() {
readLock.lock();
@ -300,8 +266,8 @@ public void setPolicy(SchedulingPolicy policy)
}
super.policy = policy;
}
public void incrementRunnableApps() {
void incrementRunnableApps() {
writeLock.lock();
try {
runnableApps++;
@ -310,7 +276,7 @@ public void incrementRunnableApps() {
}
}
public void decrementRunnableApps() {
void decrementRunnableApps() {
writeLock.lock();
try {
runnableApps--;

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
/**
* Thread that handles FairScheduler preemption.
*/
class FSPreemptionThread extends Thread {
private static final Log LOG = LogFactory.getLog(FSPreemptionThread.class);
protected final FSContext context;
private final FairScheduler scheduler;
private final long warnTimeBeforeKill;
private final Timer preemptionTimer;
FSPreemptionThread(FairScheduler scheduler) {
this.scheduler = scheduler;
this.context = scheduler.getContext();
FairSchedulerConfiguration fsConf = scheduler.getConf();
context.setPreemptionEnabled();
context.setPreemptionUtilizationThreshold(
fsConf.getPreemptionUtilizationThreshold());
warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
preemptionTimer = new Timer("Preemption Timer", true);
setDaemon(true);
setName("FSPreemptionThread");
}
public void run() {
while (!Thread.interrupted()) {
FSAppAttempt starvedApp;
try{
starvedApp = context.getStarvedApps().take();
if (!Resources.isNone(starvedApp.getStarvation())) {
List<RMContainer> containers =
identifyContainersToPreempt(starvedApp);
if (containers != null) {
preemptContainers(containers);
}
}
} catch (InterruptedException e) {
LOG.info("Preemption thread interrupted! Exiting.");
return;
}
}
}
/**
* Given an app, identify containers to preempt to satisfy the app's next
* resource request.
*
* @param starvedApp starved application for which we are identifying
* preemption targets
* @return list of containers to preempt to satisfy starvedApp, null if the
* app cannot be satisfied by preempting any running containers
*/
private List<RMContainer> identifyContainersToPreempt(
FSAppAttempt starvedApp) {
List<RMContainer> containers = new ArrayList<>(); // return value
// Find the nodes that match the next resource request
ResourceRequest request = starvedApp.getNextResourceRequest();
// TODO (KK): Should we check other resource requests if we can't match
// the first one?
Resource requestCapability = request.getCapability();
List<FSSchedulerNode> potentialNodes =
scheduler.getNodeTracker().getNodesByResourceName(
request.getResourceName());
// From the potential nodes, pick a node that has enough containers
// from apps over their fairshare
for (FSSchedulerNode node : potentialNodes) {
// Reset containers for the new node being considered.
containers.clear();
// TODO (YARN-5829): Attempt to reserve the node for starved app. The
// subsequent if-check needs to be reworked accordingly.
FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
// This node is already reserved by another app. Let us not consider
// this for preemption.
continue;
}
// Figure out list of containers to consider
List<RMContainer> containersToCheck =
node.getCopiedListOfRunningContainers();
containersToCheck.removeAll(node.getContainersForPreemption());
// Initialize potential with unallocated resources
Resource potential = Resources.clone(node.getUnallocatedResource());
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
if (app.canContainerBePreempted(container)) {
// Flag container for preemption
containers.add(container);
Resources.addTo(potential, container.getAllocatedResource());
}
// Check if we have already identified enough containers
if (Resources.fitsIn(requestCapability, potential)) {
// Mark the containers as being considered for preemption on the node.
// Make sure the containers are subsequently removed by calling
// FSSchedulerNode#removeContainerForPreemption.
node.addContainersForPreemption(containers);
return containers;
} else {
// TODO (YARN-5829): Unreserve the node for the starved app.
}
}
}
return null;
}
private void preemptContainers(List<RMContainer> containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container " + container +
" from queue " + queue.getName());
app.trackContainerForPreemption(container);
}
// Schedule timer task to kill containers
preemptionTimer.schedule(
new PreemptContainersTask(containers), warnTimeBeforeKill);
}
private class PreemptContainersTask extends TimerTask {
private List<RMContainer> containers;
PreemptContainersTask(List<RMContainer> containers) {
this.containers = containers;
}
@Override
public void run() {
for (RMContainer container : containers) {
ContainerStatus status = SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
LOG.info("Killing container " + container);
scheduler.completedContainer(
container, status, RMContainerEventType.KILL);
FSSchedulerNode containerNode = (FSSchedulerNode)
scheduler.getNodeTracker().getNode(container.getAllocatedNode());
containerNode.removeContainerForPreemption(container);
}
}
}
}

View File

@ -260,7 +260,7 @@ public Resource getSteadyFairShare() {
return steadyFairShare;
}
public void setSteadyFairShare(Resource steadyFairShare) {
void setSteadyFairShare(Resource steadyFairShare) {
this.steadyFairShare = steadyFairShare;
metrics.setSteadyFairShare(steadyFairShare);
}
@ -269,27 +269,27 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
}
public long getFairSharePreemptionTimeout() {
long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
}
public long getMinSharePreemptionTimeout() {
long getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
public float getFairSharePreemptionThreshold() {
float getFairSharePreemptionThreshold() {
return fairSharePreemptionThreshold;
}
public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}
@ -299,9 +299,17 @@ public boolean isPreemptable() {
/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
* queue's current share, and checks for starvation.
*
* @param checkStarvation whether to check for fairshare or minshare
* starvation on update
*/
public abstract void recomputeShares();
abstract void updateInternal(boolean checkStarvation);
public void update(Resource fairShare, boolean checkStarvation) {
setFairShare(fairShare);
updateInternal(checkStarvation);
}
/**
* Update the min/fair share preemption timeouts, threshold and preemption
@ -354,7 +362,7 @@ public abstract void collectSchedulerApplications(
*
* @return true if check passes (can assign) or false otherwise
*/
protected boolean assignContainerPreCheck(FSSchedulerNode node) {
boolean assignContainerPreCheck(FSSchedulerNode node) {
if (!Resources.fitsIn(getResourceUsage(), maxShare)
|| node.getReservedContainer() != null) {
return false;
@ -410,7 +418,7 @@ public Priority getDefaultApplicationPriority() {
return null;
}
public boolean fitsInMaxShare(Resource additionalResource) {
boolean fitsInMaxShare(Resource additionalResource) {
Resource usagePlusAddition =
Resources.add(getResourceUsage(), additionalResource);

View File

@ -29,6 +29,10 @@
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@Private
@Unstable
public class FSSchedulerNode extends SchedulerNode {
@ -36,6 +40,8 @@ public class FSSchedulerNode extends SchedulerNode {
private static final Log LOG = LogFactory.getLog(FSSchedulerNode.class);
private FSAppAttempt reservedAppSchedulable;
private final Set<RMContainer> containersForPreemption =
new ConcurrentSkipListSet<>();
public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
super(node, usePortForNodeName);
@ -99,8 +105,36 @@ public synchronized void unreserveResource(
this.reservedAppSchedulable = null;
}
public synchronized FSAppAttempt getReservedAppSchedulable() {
synchronized FSAppAttempt getReservedAppSchedulable() {
return reservedAppSchedulable;
}
/**
* Mark {@code containers} as being considered for preemption so they are
* not considered again. A call to this requires a corresponding call to
* {@link #removeContainerForPreemption} to ensure we do not mark a
* container for preemption and never consider it again and avoid memory
* leaks.
*
* @param containers container to mark
*/
void addContainersForPreemption(Collection<RMContainer> containers) {
containersForPreemption.addAll(containers);
}
/**
* @return set of containers marked for preemption.
*/
Set<RMContainer> getContainersForPreemption() {
return containersForPreemption;
}
/**
* Remove container from the set of containers marked for preemption.
*
* @param container container to remove
*/
void removeContainerForPreemption(RMContainer container) {
containersForPreemption.remove(container);
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.Serializable;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
/**
* Helper class to track starved applications.
*
* Initially, this uses a blocking queue. We could use other data structures
* in the future. This class also has some methods to simplify testing.
*/
class FSStarvedApps {
// List of apps to be processed by the preemption thread.
private PriorityBlockingQueue<FSAppAttempt> appsToProcess;
// App being currently processed. This assumes a single reader.
private FSAppAttempt appBeingProcessed;
FSStarvedApps() {
appsToProcess = new PriorityBlockingQueue<>(10, new StarvationComparator());
}
/**
* Add a starved application if it is not already added.
* @param app application to add
*/
void addStarvedApp(FSAppAttempt app) {
if (!app.equals(appBeingProcessed) && !appsToProcess.contains(app)) {
appsToProcess.add(app);
}
}
/**
* Blocking call to fetch the next app to process. The returned app is
* tracked until the next call to this method. This tracking assumes a
* single reader.
*
* @return starved application to process
* @throws InterruptedException if interrupted while waiting
*/
FSAppAttempt take() throws InterruptedException {
// Reset appBeingProcessed before the blocking call
appBeingProcessed = null;
// Blocking call to fetch the next starved application
FSAppAttempt app = appsToProcess.take();
appBeingProcessed = app;
return app;
}
private static class StarvationComparator implements
Comparator<FSAppAttempt>, Serializable {
private static final long serialVersionUID = 1;
@Override
public int compare(FSAppAttempt app1, FSAppAttempt app2) {
int ret = 1;
if (Resources.fitsIn(app1.getStarvation(), app2.getStarvation())) {
ret = -1;
}
return ret;
}
}
}

View File

@ -20,6 +20,17 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -85,17 +96,6 @@
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* A scheduler that schedules resources between a set of queues. The scheduler
* keeps track of the resources used by each queue, and attempts to maintain
@ -122,6 +122,7 @@ public class FairScheduler extends
AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
private FairSchedulerConfiguration conf;
private FSContext context;
private Resource incrAllocation;
private QueueManager queueMgr;
private boolean usePortForNodeName;
@ -149,6 +150,9 @@ public class FairScheduler extends
@VisibleForTesting
Thread schedulingThread;
Thread preemptionThread;
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@ -156,25 +160,6 @@ public class FairScheduler extends
FSQueueMetrics rootMetrics;
FSOpDurations fsOpDurations;
// Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
// Preemption related variables
protected boolean preemptionEnabled;
protected float preemptionUtilizationThreshold;
// How often tasks are preempted
protected long preemptionInterval;
// ms to wait before force killing stuff (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
protected long waitTimeBeforeKill;
// Containers whose AMs have been warned that they will be preempted soon.
private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
private float reservableNodesRatio; // percentage of available nodes
// an app can be reserved on
@ -210,11 +195,17 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
@VisibleForTesting
public FSContext getContext() {
return context;
}
public boolean isAtLeastReservationThreshold(
ResourceCalculator resourceCalculator, Resource resource) {
return Resources.greaterThanOrEqual(resourceCalculator,
@ -311,7 +302,6 @@ public void run() {
}
long start = getClock().getTime();
update();
preemptTasksIfNecessary();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
@ -353,7 +343,6 @@ protected void update() {
try {
writeLock.lock();
long start = getClock().getTime();
updateStarvationStats(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@ -361,214 +350,30 @@ protected void update() {
rootQueue.updateDemand();
Resource clusterResource = getClusterResource();
rootQueue.setFairShare(clusterResource);
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeShares();
rootQueue.update(clusterResource, shouldAttemptPreemption());
// Update metrics
updateRootQueueMetrics();
if (LOG.isDebugEnabled()) {
if (--updatesToSkipForDebug < 0) {
updatesToSkipForDebug = UPDATE_DEBUG_FREQUENCY;
LOG.debug("Cluster Capacity: " + clusterResource + " Allocations: "
+ rootMetrics.getAllocatedResources() + " Availability: "
+ Resource.newInstance(rootMetrics.getAvailableMB(),
rootMetrics.getAvailableVirtualCores()) + " Demand: " + rootQueue
.getDemand());
LOG.debug("Cluster Capacity: " + clusterResource +
" Allocations: " + rootMetrics.getAllocatedResources() +
" Availability: " + Resource.newInstance(
rootMetrics.getAvailableMB(),
rootMetrics.getAvailableVirtualCores()) +
" Demand: " + rootQueue.getDemand());
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateCallDuration(duration);
} finally {
writeLock.unlock();
}
}
/**
* Update the preemption fields for all QueueScheduables, i.e. the times since
* each queue last was at its guaranteed share and over its fair share
* threshold for each type of task.
*/
private void updateStarvationStats() {
lastPreemptionUpdateTime = getClock().getTime();
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
sched.updateStarvationStats();
}
}
/**
* Check for queues that need tasks preempted, either because they have been
* below their guaranteed share for minSharePreemptionTimeout or they have
* been below their fair share threshold for the fairSharePreemptionTimeout. If
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
protected void preemptTasksIfNecessary() {
try {
writeLock.lock();
if (!shouldAttemptPreemption()) {
return;
}
long curTime = getClock().getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;
}
lastPreemptCheckTime = curTime;
Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
}
if (isResourceGreaterThanNone(resToPreempt)) {
preemptResources(resToPreempt);
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateCallDuration(duration);
}
} finally {
writeLock.unlock();
}
}
/**
* Preempt a quantity of resources. Each round, we start from the root queue,
* level-by-level, until choosing a candidate application.
* The policy for prioritizing preemption for each queue depends on its
* SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
* most over its fair share; (2) FIFO, choose the childSchedulable that is
* latest launched.
* Inside each application, we further prioritize preemption by choosing
* containers with lowest priority to preempt.
* We make sure that no queue is placed below its fair share in the process.
*/
protected void preemptResources(Resource toPreempt) {
long start = getClock().getTime();
if (Resources.equals(toPreempt, Resources.none())) {
return;
}
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) &&
isResourceGreaterThanNone(toPreempt)) {
warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.resetPreemptedResources();
}
while (isResourceGreaterThanNone(toPreempt)) {
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
warnOrKillContainer(container);
warnedContainers.add(container);
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.clearPreemptedResources();
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addPreemptCallDuration(duration);
}
private boolean isResourceGreaterThanNone(Resource toPreempt) {
return (toPreempt.getMemorySize() > 0) || (toPreempt.getVirtualCores() > 0);
}
protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());
Long time = app.getContainerPreemptionTime(container);
if (time != null) {
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
// proceed with kill
if (time + waitTimeBeforeKill < getClock().getTime()) {
ContainerStatus status =
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
super.completedContainer(container, status, RMContainerEventType.KILL);
if (LOG.isDebugEnabled()) {
LOG.debug("Killing container" + container +
" (after waiting for preemption for " +
(getClock().getTime() - time) + "ms)");
}
}
} else {
// track the request in the FSAppAttempt itself
app.addPreemption(container, getClock().getTime());
}
}
/**
* Return the resource amount that this queue is allowed to preempt, if any.
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below its fair share preemption threshold
* for at least the fairSharePreemptionTimeout, it should preempt enough tasks
* to get up to its full fair share. If both conditions hold, we preempt the
* max of the two amounts (this shouldn't happen unless someone sets the
* timeouts to be identical for some reason).
*/
protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
Resource clusterResource = getClusterResource();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.componentwiseMin(
sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(calc, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
Resource target = Resources.componentwiseMin(
sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(calc, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
Resource deficit = Resources.max(calc, clusterResource,
resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(calc, clusterResource,
deficit, Resources.none())) {
String message = "Should preempt " + deficit + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message);
}
return deficit;
}
public RMContainerTokenSecretManager
getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
@ -1226,12 +1031,12 @@ private void updateRootQueueMetrics() {
* @return true if preemption should be attempted, false otherwise.
*/
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) {
Resource clusterResource = getClusterResource();
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemorySize(),
if (context.isPreemptionEnabled()) {
return (context.getPreemptionUtilizationThreshold() < Math.max(
(float) rootMetrics.getAllocatedMB() /
getClusterResource().getMemorySize(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
getClusterResource().getVirtualCores()));
}
return false;
}
@ -1419,15 +1224,10 @@ private void initScheduler(Configuration conf) throws IOException {
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
preemptionEnabled = this.conf.getPreemptionEnabled();
preemptionUtilizationThreshold =
this.conf.getPreemptionUtilizationThreshold();
assignMultiple = this.conf.getAssignMultiple();
maxAssignDynamic = this.conf.isMaxAssignDynamic();
maxAssign = this.conf.getMaxAssign();
sizeBasedWeight = this.conf.getSizeBasedWeight();
preemptionInterval = this.conf.getPreemptionInterval();
waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
usePortForNodeName = this.conf.getUsePortForNodeName();
reservableNodesRatio = this.conf.getReservableNodes();
@ -1465,6 +1265,10 @@ private void initScheduler(Configuration conf) throws IOException {
schedulingThread.setName("FairSchedulerContinuousScheduling");
schedulingThread.setDaemon(true);
}
if (this.conf.getPreemptionEnabled()) {
createPreemptionThread();
}
} finally {
writeLock.unlock();
}
@ -1481,6 +1285,11 @@ private void initScheduler(Configuration conf) throws IOException {
}
}
@VisibleForTesting
protected void createPreemptionThread() {
preemptionThread = new FSPreemptionThread(this);
}
private void updateReservationThreshold() {
Resource newThreshold = Resources.multiply(
getIncrementResourceCapability(),
@ -1500,6 +1309,9 @@ private void startSchedulerThreads() {
"schedulingThread is null");
schedulingThread.start();
}
if (preemptionThread != null) {
preemptionThread.start();
}
allocsLoader.start();
} finally {
writeLock.unlock();
@ -1532,6 +1344,10 @@ public void serviceStop() throws Exception {
schedulingThread.join(THREAD_JOIN_TIMEOUT_MS);
}
}
if (preemptionThread != null) {
preemptionThread.interrupt();
preemptionThread.join(THREAD_JOIN_TIMEOUT_MS);
}
if (allocsLoader != null) {
allocsLoader.stop();
}

View File

@ -55,50 +55,45 @@ public interface Schedulable {
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
public String getName();
String getName();
/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
public Resource getDemand();
Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
public Resource getResourceUsage();
Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
public Resource getMinShare();
Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
public Resource getMaxShare();
Resource getMaxShare();
/** Job/queue weight in fair sharing. */
public ResourceWeights getWeights();
ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
public long getStartTime();
long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
public Priority getPriority();
Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
public void updateDemand();
void updateDemand();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
*/
public Resource assignContainer(FSSchedulerNode node);
/**
* Preempt a container from this Schedulable if possible.
*/
public RMContainer preemptContainer();
Resource assignContainer(FSSchedulerNode node);
/** Get the fair share assigned to this Schedulable. */
public Resource getFairShare();
Resource getFairShare();
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare);
void setFairShare(Resource fairShare);
}

View File

@ -17,14 +17,6 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.junit.Assert;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -40,7 +32,9 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -51,9 +45,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
public class FairSchedulerTestBase {
public final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
@ -71,9 +73,14 @@ public class FairSchedulerTestBase {
private static final int SLEEP_DURATION = 10;
private static final int SLEEP_RETRIES = 1000;
/**
* The list of nodes added to the cluster using the {@link #addNode} method.
*/
protected final List<RMNode> rmNodes = new ArrayList<>();
// Helper methods
public Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
@ -281,4 +288,18 @@ protected void checkAppConsumption(FSAppAttempt app, Resource resource)
Assert.assertEquals(resource.getVirtualCores(),
app.getCurrentConsumption().getVirtualCores());
}
/**
* Add a node to the cluster and track the nodes in {@link #rmNodes}.
* @param memory memory capacity of the node
* @param cores cpu capacity of the node
*/
protected void addNode(int memory, int cores) {
int id = rmNodes.size() + 1;
RMNode node =
MockNodes.newNodeInfo(1, Resources.createResource(memory, cores), id,
"127.0.0." + id);
scheduler.handle(new NodeAddedSchedulerEvent(node));
rmNodes.add(node);
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.HashSet;
import java.util.Set;
public class FairSchedulerWithMockPreemption extends FairScheduler {
@Override
protected void createPreemptionThread() {
preemptionThread = new MockPreemptionThread(this);
}
static class MockPreemptionThread extends FSPreemptionThread {
private Set<FSAppAttempt> appsAdded = new HashSet<>();
private int totalAppsAdded = 0;
MockPreemptionThread(FairScheduler scheduler) {
super(scheduler);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
FSAppAttempt app = context.getStarvedApps().take();
appsAdded.add(app);
totalAppsAdded++;
} catch (InterruptedException e) {
return;
}
}
}
int uniqueAppsAdded() {
return appsAdded.size();
}
int totalAppsAdded() {
return totalAppsAdded;
}
}
}

View File

@ -85,11 +85,6 @@ public Resource assignContainer(FSSchedulerNode node) {
return null;
}
@Override
public RMContainer preemptContainer() {
return null;
}
@Override
public Resource getFairShare() {
return this.fairShare;

View File

@ -0,0 +1,256 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
/**
* Test class to verify identification of app starvation
*/
public class TestFSAppStarvation extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
private static final String[] QUEUES =
{"no-preemption", "minshare", "fairshare.child", "drf.child"};
private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;
@Before
public void setup() {
createConfiguration();
conf.set(YarnConfiguration.RM_SCHEDULER,
FairSchedulerWithMockPreemption.class.getCanonicalName());
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
ALLOC_FILE.getAbsolutePath());
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
}
@After
public void teardown() {
ALLOC_FILE.delete();
conf = null;
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
}
/*
* Test to verify application starvation is computed only when preemption
* is enabled.
*/
@Test
public void testPreemptionDisabled() throws Exception {
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
setupClusterAndSubmitJobs();
assertNull("Found starved apps even when preemption is turned off",
scheduler.getContext().getStarvedApps());
}
/*
* Test to verify application starvation is computed correctly when
* preemption is turned on.
*/
@Test
public void testPreemptionEnabled() throws Exception {
setupClusterAndSubmitJobs();
assertNotNull("FSContext does not have an FSStarvedApps instance",
scheduler.getContext().getStarvedApps());
assertEquals("Expecting 3 starved applications, one each for the "
+ "minshare and fairshare queues",
3, preemptionThread.uniqueAppsAdded());
// Verify the apps get added again on a subsequent update
scheduler.update();
Thread.yield();
verifyLeafQueueStarvation();
assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
}
/*
* Test to verify app starvation is computed only when the cluster
* utilization threshold is over the preemption threshold.
*/
@Test
public void testClusterUtilizationThreshold() throws Exception {
// Set preemption threshold to 1.1, so the utilization is always lower
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 1.1f);
setupClusterAndSubmitJobs();
assertNotNull("FSContext does not have an FSStarvedApps instance",
scheduler.getContext().getStarvedApps());
assertEquals("Found starved apps when preemption threshold is over 100%", 0,
preemptionThread.totalAppsAdded());
}
private void verifyLeafQueueStarvation() {
for (String q : QUEUES) {
if (!q.equals("no-preemption")) {
boolean isStarved =
scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
assertTrue(isStarved);
}
}
}
private void setupClusterAndSubmitJobs() throws Exception {
setupStarvedCluster();
submitAppsToEachLeafQueue();
sendEnoughNodeUpdatesToAssignFully();
// Sleep to hit the preemption timeouts
Thread.sleep(10);
// Scheduler update to populate starved apps
scheduler.update();
// Wait for apps to be processed by MockPreemptionThread
Thread.yield();
}
/**
* Setup the cluster for starvation testing:
* 1. Create FS allocation file
* 2. Create and start MockRM
* 3. Add two nodes to the cluster
* 4. Submit an app that uses up all resources on the cluster
*/
private void setupStarvedCluster() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
// Default queue
out.println("<queue name=\"default\">");
out.println("</queue>");
// Queue with preemption disabled
out.println("<queue name=\"no-preemption\">");
out.println("<fairSharePreemptionThreshold>0" +
"</fairSharePreemptionThreshold>");
out.println("</queue>");
// Queue with minshare preemption enabled
out.println("<queue name=\"minshare\">");
out.println("<fairSharePreemptionThreshold>0" +
"</fairSharePreemptionThreshold>");
out.println("<minSharePreemptionTimeout>0" +
"</minSharePreemptionTimeout>");
out.println("<minResources>2048mb,2vcores</minResources>");
out.println("</queue>");
// FAIR queue with fairshare preemption enabled
out.println("<queue name=\"fairshare\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>fair</schedulingPolicy>");
addChildQueue(out);
out.println("</queue>");
// DRF queue with fairshare preemption enabled
out.println("<queue name=\"drf\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>drf</schedulingPolicy>");
addChildQueue(out);
out.println("</queue>");
out.println("</allocations>");
out.close();
assertTrue("Allocation file does not exist, not running the test",
ALLOC_FILE.exists());
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
scheduler.preemptionThread;
// Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId app
= createSchedulingRequest(1024, 1, "root.default", "default", 8);
scheduler.update();
sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
}
private void addChildQueue(PrintWriter out) {
// Child queue under fairshare with same settings
out.println("<queue name=\"child\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("</queue>");
}
private void submitAppsToEachLeafQueue() {
for (String queue : QUEUES) {
createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
}
scheduler.update();
}
private void sendEnoughNodeUpdatesToAssignFully() {
for (RMNode node : rmNodes) {
NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent =
new NodeUpdateSchedulerEvent(node);
for (int i = 0; i < NODE_CAPACITY_MULTIPLE; i++) {
scheduler.handle(nodeUpdateSchedulerEvent);
}
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -106,12 +105,8 @@ public void test() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\"></queue>");
out.println("<queue name=\"queueB\"></queue>");
out.println("</allocations>");
out.close();
@ -144,162 +139,6 @@ public void test() throws Exception {
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above min share, B below.
FSLeafQueue queueA =
scheduler.getQueueManager().getLeafQueue("queueA", false);
FSLeafQueue queueB =
scheduler.getQueueManager().getLeafQueue("queueB", false);
assertFalse(queueA.isStarvedForMinShare());
assertTrue(queueB.isStarvedForMinShare());
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here)
assertFalse(queueB.isStarvedForMinShare());
}
@Test (timeout = 5000)
public void testIsStarvedForFairShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.2</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.8</weight>");
out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
out.println("<queue name=\"queueB1\">");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
out.println("</queue>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 4 * 1024. Node update gives this all to A
createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 4; i ++) {
scheduler.handle(nodeEvent2);
}
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
assertEquals(4 * 1024, queueA.getResourceUsage().getMemorySize());
// Both queue B1 and queue B2 want 3 * 1024
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
scheduler.update();
for (int i = 0; i < 4; i ++) {
scheduler.handle(nodeEvent2);
}
FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
assertEquals(2 * 1024, queueB1.getResourceUsage().getMemorySize());
assertEquals(2 * 1024, queueB2.getResourceUsage().getMemorySize());
// For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
// threshold is 1.6 * 1024
assertFalse(queueB1.isStarvedForFairShare());
// For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
// threshold is 2.4 * 1024
assertTrue(queueB2.isStarvedForFairShare());
// Node checks in again
scheduler.handle(nodeEvent2);
scheduler.handle(nodeEvent2);
assertEquals(3 * 1024, queueB1.getResourceUsage().getMemorySize());
assertEquals(3 * 1024, queueB2.getResourceUsage().getMemorySize());
// Both queue B1 and queue B2 usages go to 3 * 1024
assertFalse(queueB1.isStarvedForFairShare());
assertFalse(queueB2.isStarvedForFairShare());
}
@Test (timeout = 5000)
public void testIsStarvedForFairShareDRF() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.5</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.5</weight>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 7 * 1024, 1. Node update gives this all to A
createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
assertEquals(7 * 1024, queueA.getResourceUsage().getMemorySize());
assertEquals(1, queueA.getResourceUsage().getVirtualCores());
// Queue B has 3 reqs :
// 1) 2 * 1024, 5 .. which will be granted
// 2) 1 * 1024, 1 .. which will be granted
// 3) 1 * 1024, 1 .. which wont
createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
scheduler.update();
for (int i = 0; i < 3; i ++) {
scheduler.handle(nodeEvent2);
}
FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
assertEquals(3 * 1024, queueB.getResourceUsage().getMemorySize());
assertEquals(6, queueB.getResourceUsage().getVirtualCores());
scheduler.update();
// Verify that Queue us not starved for fair share..
// Since the Starvation logic now uses DRF when the policy = drf, The
// Queue should not be starved
assertFalse(queueB.isStarvedForFairShare());
}
@Test

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import static org.junit.Assert.assertEquals;
/**
* QueueManager tests that require a real scheduler
*/
public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
private final static File ALLOC_FILE = new File(TEST_DIR, "test-queue-mgr");
@Before
public void setup() throws IOException {
createConfiguration();
writeAllocFile(30, 40);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
ALLOC_FILE.getAbsolutePath());
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
}
@After
public void teardown() {
ALLOC_FILE.deleteOnExit();
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
}
private void writeAllocFile(int defaultFairShareTimeout,
int fairShareTimeout) throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.println("<defaultMinSharePreemptionTimeout>15"
+ "</defaultMinSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionTimeout>" +
+ defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>");
out.println("<fairSharePreemptionTimeout>"
+ fairShareTimeout + "</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
}
@Test
public void testBackwardsCompatiblePreemptionConfiguration()
throws IOException {
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// Lower the fairshare preemption timeouts and verify it is picked
// correctly.
writeAllocFile(25, 30);
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(25000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
}
}

View File

@ -329,11 +329,6 @@ public Resource assignContainer(FSSchedulerNode node) {
throw new UnsupportedOperationException();
}
@Override
public RMContainer preemptContainer() {
throw new UnsupportedOperationException();
}
@Override
public Resource getFairShare() {
throw new UnsupportedOperationException();