YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha)

This commit is contained in:
Karthik Kambatla 2017-02-15 23:16:01 -08:00
parent a136936d01
commit 6c25dbcdc0
12 changed files with 584 additions and 144 deletions

View File

@ -182,6 +182,24 @@ public class Resources {
return subtractFrom(clone(lhs), rhs);
}
/**
* Subtract <code>rhs</code> from <code>lhs</code> and reset any negative
* values to zero.
* @param lhs {@link Resource} to subtract from
* @param rhs {@link Resource} to subtract
* @return the value of lhs after subtraction
*/
public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
subtractFrom(lhs, rhs);
if (lhs.getMemorySize() < 0) {
lhs.setMemorySize(0);
}
if (lhs.getVirtualCores() < 0) {
lhs.setVirtualCores(0);
}
return lhs;
}
public static Resource negate(Resource resource) {
return subtract(NONE, resource);
}

View File

@ -127,6 +127,7 @@ public abstract class AbstractYarnScheduler
*/
protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval;
protected long nmHeartbeatInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@ -163,6 +164,9 @@ public abstract class AbstractYarnScheduler
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
nmHeartbeatInterval =
conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);

View File

@ -87,6 +87,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private final Set<RMContainer> containersToPreempt = new HashSet<>();
private Resource fairshareStarvation = Resources.none();
private long lastTimeAtFairShare;
private long nextStarvationCheck;
// minShareStarvation attributed to this application by the leaf queue
private Resource minshareStarvation = Resources.none();
@ -211,15 +212,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
}
for (FSSchedulerNode node: blacklistNodeIds) {
Resources.subtractFrom(availableResources,
Resources.subtractFromNonNegative(availableResources,
node.getUnallocatedResource());
}
if (availableResources.getMemorySize() < 0) {
availableResources.setMemorySize(0);
}
if (availableResources.getVirtualCores() < 0) {
availableResources.setVirtualCores(0);
}
}
/**
@ -529,6 +524,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return Resources.add(fairshareStarvation, minshareStarvation);
}
/**
* Get last computed fairshare starvation.
*
* @return last computed fairshare starvation
*/
Resource getFairshareStarvation() {
return fairshareStarvation;
}
/**
* Set the minshare attributed to this application. To be called only from
* {@link FSLeafQueue#updateStarvedApps}.
@ -1077,17 +1081,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
/**
* Helper method that computes the extent of fairshare fairshareStarvation.
* Helper method that computes the extent of fairshare starvation.
* @return freshly computed fairshare starvation
*/
Resource fairShareStarvation() {
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
Resource starvation = Resources.componentwiseMin(threshold, demand);
Resources.subtractFromNonNegative(starvation, getResourceUsage());
long now = scheduler.getClock().getTime();
boolean starved = Resources.greaterThan(
fsQueue.getPolicy().getResourceCalculator(),
scheduler.getClusterResource(), starvation, Resources.none());
boolean starved = !Resources.isNone(starvation);
if (!starved) {
lastTimeAtFairShare = now;
@ -1111,6 +1115,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return !Resources.isNone(fairshareStarvation);
}
/**
* Fetch a list of RRs corresponding to the extent the app is starved
* (fairshare and minshare). This method considers the number of containers
* in a RR and also only one locality-level (the first encountered
* resourceName).
*
* @return list of {@link ResourceRequest}s corresponding to the amount of
* starvation.
*/
List<ResourceRequest> getStarvedResourceRequests() {
// List of RRs we build in this method to return
List<ResourceRequest> ret = new ArrayList<>();
// Track visited RRs to avoid the same RR at multiple locality levels
VisitedResourceRequestTracker visitedRRs =
new VisitedResourceRequestTracker(scheduler.getNodeTracker());
// Start with current starvation and track the pending amount
Resource pending = getStarvation();
for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
if (Resources.isNone(pending)) {
// Found enough RRs to match the starvation
break;
}
// See if we have already seen this RR
if (!visitedRRs.visit(rr)) {
continue;
}
// A RR can have multiple containers of a capability. We need to
// compute the number of containers that fit in "pending".
int numContainersThatFit = (int) Math.floor(
Resources.ratio(scheduler.getResourceCalculator(),
pending, rr.getCapability()));
if (numContainersThatFit == 0) {
// This RR's capability is too large to fit in pending
continue;
}
// If the RR is only partially being satisfied, include only the
// partial number of containers.
if (numContainersThatFit < rr.getNumContainers()) {
rr = ResourceRequest.newInstance(rr.getPriority(),
rr.getResourceName(), rr.getCapability(), numContainersThatFit);
}
// Add the RR to return list and adjust "pending" accordingly
ret.add(rr);
Resources.subtractFromNonNegative(pending,
Resources.multiply(rr.getCapability(), rr.getNumContainers()));
}
return ret;
}
/**
* Notify this app that preemption has been triggered to make room for
* outstanding demand. The app should not be considered starved until after
* the specified delay.
*
* @param delayBeforeNextStarvationCheck duration to wait
*/
void preemptionTriggered(long delayBeforeNextStarvationCheck) {
nextStarvationCheck =
scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
}
/**
* Whether this app's starvation should be considered.
*/
boolean shouldCheckForStarvation() {
return scheduler.getClock().getTime() >= nextStarvationCheck;
}
/* Schedulable methods implementation */
@Override
@ -1123,6 +1202,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return demand;
}
/**
* Get the current app's unsatisfied demand.
*/
Resource getPendingDemand() {
return Resources.subtract(demand, getResourceUsage());
}
@Override
public long getStartTime() {
return startTime;

View File

@ -219,6 +219,63 @@ public class FSLeafQueue extends FSQueue {
}
}
/**
* Compute the extent of fairshare starvation for a set of apps.
*
* @param appsWithDemand apps to compute fairshare starvation for
* @return aggregate fairshare starvation for all apps
*/
private Resource updateStarvedAppsFairshare(
TreeSet<FSAppAttempt> appsWithDemand) {
Resource fairShareStarvation = Resources.clone(none());
// Fetch apps with unmet demand sorted by fairshare starvation
for (FSAppAttempt app : appsWithDemand) {
Resource appStarvation = app.fairShareStarvation();
if (!Resources.isNone(appStarvation)) {
context.getStarvedApps().addStarvedApp(app);
Resources.addTo(fairShareStarvation, appStarvation);
} else {
break;
}
}
return fairShareStarvation;
}
/**
* Distribute minshare starvation to a set of apps
* @param appsWithDemand set of apps
* @param minShareStarvation minshare starvation to distribute
*/
private void updateStarvedAppsMinshare(
final TreeSet<FSAppAttempt> appsWithDemand,
final Resource minShareStarvation) {
Resource pending = Resources.clone(minShareStarvation);
// Keep adding apps to the starved list until the unmet demand goes over
// the remaining minshare
for (FSAppAttempt app : appsWithDemand) {
if (!Resources.isNone(pending)) {
Resource appMinShare = app.getPendingDemand();
Resources.subtractFromNonNegative(
appMinShare, app.getFairshareStarvation());
if (Resources.greaterThan(policy.getResourceCalculator(),
scheduler.getClusterResource(), appMinShare, pending)) {
Resources.subtractFromNonNegative(appMinShare, pending);
pending = none();
} else {
Resources.subtractFromNonNegative(pending, appMinShare);
}
app.setMinshareStarvation(appMinShare);
context.getStarvedApps().addStarvedApp(app);
} else {
// Reset minshare starvation in case we had set it in a previous
// iteration
app.resetMinshareStarvation();
}
}
}
/**
* Helper method to identify starved applications. This needs to be called
* ONLY from {@link #updateInternal}, after the application shares
@ -237,44 +294,20 @@ public class FSLeafQueue extends FSQueue {
* starved due to fairshare, there might still be starved applications.
*/
private void updateStarvedApps() {
// First identify starved applications and track total amount of
// starvation (in resources)
Resource fairShareStarvation = Resources.clone(none());
// Fetch apps with pending demand
TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);
// Fetch apps with unmet demand sorted by fairshare starvation
TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
for (FSAppAttempt app : appsWithDemand) {
Resource appStarvation = app.fairShareStarvation();
if (!Resources.equals(Resources.none(), appStarvation)) {
context.getStarvedApps().addStarvedApp(app);
Resources.addTo(fairShareStarvation, appStarvation);
} else {
break;
}
}
// Process apps with fairshare starvation
Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);
// Compute extent of minshare starvation
Resource minShareStarvation = minShareStarvation();
// Compute minshare starvation that is not subsumed by fairshare starvation
Resources.subtractFrom(minShareStarvation, fairShareStarvation);
Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);
// Keep adding apps to the starved list until the unmet demand goes over
// the remaining minshare
for (FSAppAttempt app : appsWithDemand) {
if (Resources.greaterThan(policy.getResourceCalculator(),
scheduler.getClusterResource(), minShareStarvation, none())) {
Resource appPendingDemand =
Resources.subtract(app.getDemand(), app.getResourceUsage());
Resources.subtractFrom(minShareStarvation, appPendingDemand);
app.setMinshareStarvation(appPendingDemand);
context.getStarvedApps().addStarvedApp(app);
} else {
// Reset minshare starvation in case we had set it in a previous
// iteration
app.resetMinshareStarvation();
}
}
// Assign this minshare to apps with pending demand over fairshare
updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
}
@Override
@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue {
return assigned;
}
for (FSAppAttempt sched : fetchAppsWithDemand()) {
for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue {
return assigned;
}
private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
/**
* Fetch the subset of apps that have unmet demand. When used for
* preemption-related code (as opposed to allocation), omits apps that
* should not be checked for starvation.
*
* @param assignment whether the apps are for allocation containers, as
* opposed to preemption calculations
* @return Set of apps with unmet demand
*/
private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
TreeSet<FSAppAttempt> pendingForResourceApps =
new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resource pending = app.getAppAttemptResourceUsage().getPending();
if (!pending.equals(none())) {
if (!Resources.isNone(pending) &&
(assignment || app.shouldCheckForStarvation())) {
pendingForResourceApps.add(app);
}
}

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
@ -43,20 +41,26 @@ class FSPreemptionThread extends Thread {
protected final FSContext context;
private final FairScheduler scheduler;
private final long warnTimeBeforeKill;
private final long delayBeforeNextStarvationCheck;
private final Timer preemptionTimer;
FSPreemptionThread(FairScheduler scheduler) {
setDaemon(true);
setName("FSPreemptionThread");
this.scheduler = scheduler;
this.context = scheduler.getContext();
FairSchedulerConfiguration fsConf = scheduler.getConf();
context.setPreemptionEnabled();
context.setPreemptionUtilizationThreshold(
fsConf.getPreemptionUtilizationThreshold());
warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
preemptionTimer = new Timer("Preemption Timer", true);
setDaemon(true);
setName("FSPreemptionThread");
warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
long allocDelay = (fsConf.isContinuousSchedulingEnabled()
? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs
: 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
fsConf.getWaitTimeBeforeNextStarvationCheck();
}
public void run() {
@ -64,13 +68,8 @@ class FSPreemptionThread extends Thread {
FSAppAttempt starvedApp;
try{
starvedApp = context.getStarvedApps().take();
if (!Resources.isNone(starvedApp.getStarvation())) {
PreemptableContainers containers =
identifyContainersToPreempt(starvedApp);
if (containers != null) {
preemptContainers(containers.containers);
}
}
preemptContainers(identifyContainersToPreempt(starvedApp));
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
} catch (InterruptedException e) {
LOG.info("Preemption thread interrupted! Exiting.");
return;
@ -79,58 +78,57 @@ class FSPreemptionThread extends Thread {
}
/**
* Given an app, identify containers to preempt to satisfy the app's next
* resource request.
* Given an app, identify containers to preempt to satisfy the app's
* starvation.
*
* Mechanics:
* 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
* starvation.
* 2. For each {@link ResourceRequest}, iterate through matching
* nodes and identify containers to preempt all on one node, also
* optimizing for least number of AM container preemptions.
*
* @param starvedApp starved application for which we are identifying
* preemption targets
* @return list of containers to preempt to satisfy starvedApp, null if the
* app cannot be satisfied by preempting any running containers
* @return list of containers to preempt to satisfy starvedApp
*/
private PreemptableContainers identifyContainersToPreempt(
private List<RMContainer> identifyContainersToPreempt(
FSAppAttempt starvedApp) {
PreemptableContainers bestContainers = null;
List<RMContainer> containersToPreempt = new ArrayList<>();
// Find the nodes that match the next resource request
SchedulingPlacementSet nextPs =
starvedApp.getAppSchedulingInfo().getFirstSchedulingPlacementSet();
PendingAsk firstPendingAsk = nextPs.getPendingAsk(ResourceRequest.ANY);
// TODO (KK): Should we check other resource requests if we can't match
// the first one?
// Iterate through enough RRs to address app's starvation
for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
for (int i = 0; i < rr.getNumContainers(); i++) {
PreemptableContainers bestContainers = null;
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
.getNodesByResourceName(rr.getResourceName());
for (FSSchedulerNode node : potentialNodes) {
// TODO (YARN-5829): Attempt to reserve the node for starved app.
if (isNodeAlreadyReserved(node, starvedApp)) {
continue;
}
Resource requestCapability = firstPendingAsk.getPerAllocationResource();
int maxAMContainers = bestContainers == null ?
Integer.MAX_VALUE : bestContainers.numAMContainers;
PreemptableContainers preemptableContainers =
identifyContainersToPreemptOnNode(
rr.getCapability(), node, maxAMContainers);
if (preemptableContainers != null) {
// This set is better than any previously identified set.
bestContainers = preemptableContainers;
if (preemptableContainers.numAMContainers == 0) {
break;
}
}
} // End of iteration through nodes for one RR
List<FSSchedulerNode> potentialNodes =
scheduler.getNodeTracker().getNodesByResourceName(
nextPs.getAcceptedResouceNames().next().toString());
// From the potential nodes, pick a node that has enough containers
// from apps over their fairshare
for (FSSchedulerNode node : potentialNodes) {
// TODO (YARN-5829): Attempt to reserve the node for starved app. The
// subsequent if-check needs to be reworked accordingly.
FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
// This node is already reserved by another app. Let us not consider
// this for preemption.
continue;
}
int maxAMContainers = bestContainers == null ?
Integer.MAX_VALUE : bestContainers.numAMContainers;
PreemptableContainers preemptableContainers =
identifyContainersToPreemptOnNode(requestCapability, node,
maxAMContainers);
if (preemptableContainers != null) {
if (preemptableContainers.numAMContainers == 0) {
return preemptableContainers;
} else {
bestContainers = preemptableContainers;
if (bestContainers != null && bestContainers.containers.size() > 0) {
containersToPreempt.addAll(bestContainers.containers);
trackPreemptionsAgainstNode(bestContainers.containers);
}
}
}
return bestContainers;
} // End of iteration over RRs
return containersToPreempt;
}
/**
@ -181,23 +179,25 @@ class FSPreemptionThread extends Thread {
return null;
}
private void preemptContainers(List<RMContainer> containers) {
// Mark the containers as being considered for preemption on the node.
// Make sure the containers are subsequently removed by calling
// FSSchedulerNode#removeContainerForPreemption.
if (containers.size() > 0) {
FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
.getNode(containers.get(0).getNodeId());
node.addContainersForPreemption(containers);
}
private boolean isNodeAlreadyReserved(
FSSchedulerNode node, FSAppAttempt app) {
FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
return nodeReservedApp != null && !nodeReservedApp.equals(app);
}
private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
.getNode(containers.get(0).getNodeId());
node.addContainersForPreemption(containers);
}
private void preemptContainers(List<RMContainer> containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container " + container +
" from queue " + queue.getName());
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}

View File

@ -1774,4 +1774,8 @@ public class FairScheduler extends
public float getReservableNodesRatio() {
return reservableNodesRatio;
}
long getNMHeartbeatInterval() {
return nmHeartbeatInterval;
}
}

View File

@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String PREEMPTION_THRESHOLD =
CONF_PREFIX + "preemption.cluster-utilization-threshold";
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/**
* Configurable delay (ms) before an app's starvation is considered after
* it is identified. This is to give the scheduler enough time to
* allocate containers post preemption. This delay is added to the
* {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
*
* This is intended to be a backdoor on production clusters, and hence
* intentionally not documented.
*/
protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
protected static final long
DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;
/** Whether to assign multiple containers in one check-in. */
public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration {
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
public int getPreemptionInterval() {
return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
public long getWaitTimeBeforeNextStarvationCheck() {
return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
}
public int getWaitTimeBeforeKill() {

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Applications place {@link ResourceRequest}s at multiple levels. This is a
* helper class that allows tracking if a {@link ResourceRequest} has been
* visited at a different locality level.
*
* This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}.
* The implementation is not thread-safe.
*/
class VisitedResourceRequestTracker {
private static final Log LOG =
LogFactory.getLog(VisitedResourceRequestTracker.class);
private final Map<Priority, Map<Resource, TrackerPerPriorityResource>> map =
new HashMap<>();
private final ClusterNodeTracker<FSSchedulerNode> nodeTracker;
VisitedResourceRequestTracker(
ClusterNodeTracker<FSSchedulerNode> nodeTracker) {
this.nodeTracker = nodeTracker;
}
/**
* Check if the {@link ResourceRequest} is visited before, and track it.
* @param rr {@link ResourceRequest} to visit
* @return true if <code>rr</code> is the first visit across all
* locality levels, false otherwise
*/
boolean visit(ResourceRequest rr) {
Priority priority = rr.getPriority();
Resource capability = rr.getCapability();
Map<Resource, TrackerPerPriorityResource> subMap = map.get(priority);
if (subMap == null) {
subMap = new HashMap<>();
map.put(priority, subMap);
}
TrackerPerPriorityResource tracker = subMap.get(capability);
if (tracker == null) {
tracker = new TrackerPerPriorityResource();
subMap.put(capability, tracker);
}
return tracker.visit(rr.getResourceName());
}
private class TrackerPerPriorityResource {
private Set<String> racksWithNodesVisited = new HashSet<>();
private Set<String> racksVisted = new HashSet<>();
private boolean anyVisited;
private boolean visitAny() {
if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) {
anyVisited = true;
}
return anyVisited;
}
private boolean visitRack(String rackName) {
if (anyVisited || racksWithNodesVisited.contains(rackName)) {
return false;
} else {
racksVisted.add(rackName);
return true;
}
}
private boolean visitNode(String rackName) {
if (anyVisited || racksVisted.contains(rackName)) {
return false;
} else {
racksWithNodesVisited.add(rackName);
return true;
}
}
/**
* Based on whether <code>resourceName</code> is a node, rack or ANY,
* check if this has been visited earlier.
*
* A node is considered visited if its rack or ANY have been visited.
* A rack is considered visited if any nodes or ANY have been visited.
* Any is considered visited if any of the nodes/racks have been visited.
*
* @param resourceName nodename or rackname or ANY
* @return true if this is the first visit, false otherwise
*/
private boolean visit(String resourceName) {
if (resourceName.equals(ResourceRequest.ANY)) {
return visitAny();
}
List<FSSchedulerNode> nodes =
nodeTracker.getNodesByResourceName(resourceName);
int numNodes = nodes.size();
if (numNodes == 0) {
LOG.error("Found ResourceRequest for a non-existent node/rack named " +
resourceName);
return false;
}
if (numNodes == 1) {
// Found a single node. To be safe, let us verify it is a node and
// not a rack with a single node.
FSSchedulerNode node = nodes.get(0);
if (node.getNodeName().equals(resourceName)) {
return visitNode(node.getRackName());
}
}
// At this point, it is not ANY or a node. Must be a rack
return visitRack(resourceName);
}
}
}

View File

@ -21,6 +21,8 @@ import java.util.HashSet;
import java.util.Set;
public class FairSchedulerWithMockPreemption extends FairScheduler {
static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000;
@Override
protected void createPreemptionThread() {
preemptionThread = new MockPreemptionThread(this);
@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
private Set<FSAppAttempt> appsAdded = new HashSet<>();
private int totalAppsAdded = 0;
MockPreemptionThread(FairScheduler scheduler) {
private MockPreemptionThread(FairScheduler scheduler) {
super(scheduler);
}
@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
FSAppAttempt app = context.getStarvedApps().take();
appsAdded.add(app);
totalAppsAdded++;
app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS);
} catch (InterruptedException e) {
return;
}

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
private final ControlledClock clock = new ControlledClock();
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
private static final String[] QUEUES =
@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
+ "minshare and fairshare queues",
3, preemptionThread.uniqueAppsAdded());
// Verify the apps get added again on a subsequent update
// Verify apps are added again only after the set delay for starvation has
// passed.
clock.tickSec(1);
scheduler.update();
Thread.yield();
assertEquals("Apps re-added even before starvation delay passed",
preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded());
verifyLeafQueueStarvation();
clock.tickMsec(
FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS);
scheduler.update();
assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
}
@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
sendEnoughNodeUpdatesToAssignFully();
// Sleep to hit the preemption timeouts
Thread.sleep(10);
clock.tickMsec(10);
// Scheduler update to populate starved apps
scheduler.update();
@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
ALLOC_FILE.exists());
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
scheduler.setClock(clock);
resourceManager.start();
preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
scheduler.preemptionThread;

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
private static final int GB = 1024;
// Scheduler clock
private final ControlledClock clock = new ControlledClock();
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Starving app that is expected to instigate preemption
private FSAppAttempt starvingApp;
@Parameterized.Parameters
public static Collection<Boolean[]> getParameters() {
return Arrays.asList(new Boolean[][] {
{true}, {false}});
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters() {
return Arrays.asList(new Object[][] {
{"FairSharePreemption", true},
{"MinSharePreemption", false}});
}
public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
public TestFairSchedulerPreemption(String name, boolean fairshare)
throws IOException {
fairsharePreemption = fairshare;
writeAllocFile();
}
@Before
public void setup() {
public void setup() throws IOException {
createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
ALLOC_FILE.getAbsolutePath());
conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
setupCluster();
}
@After
@ -166,8 +173,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private void setupCluster() throws IOException {
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
scheduler.setClock(clock);
resourceManager.start();
// Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
@ -197,7 +205,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
*
* @param queueName queue name
*/
private void takeAllResource(String queueName) {
private void takeAllResources(String queueName) {
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId
= createSchedulingRequest(GB, 1, queueName, "default",
@ -227,8 +235,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId);
// Sleep long enough to pass
Thread.sleep(10);
// Move clock enough to identify starvation
clock.tickSec(1);
scheduler.update();
}
@ -243,14 +251,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
*/
private void submitApps(String queue1, String queue2)
throws InterruptedException {
takeAllResource(queue1);
takeAllResources(queue1);
preemptHalfResources(queue2);
}
private void verifyPreemption() throws InterruptedException {
// Sleep long enough for four containers to be preempted. Note that the
// starved app must be queued four times for containers to be preempted.
for (int i = 0; i < 10000; i++) {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 100; i++) {
if (greedyApp.getLiveContainers().size() == 4) {
break;
}
@ -268,7 +275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private void verifyNoPreemption() throws InterruptedException {
// Sleep long enough to ensure not even one container is preempted.
for (int i = 0; i < 600; i++) {
for (int i = 0; i < 100; i++) {
if (greedyApp.getLiveContainers().size() != 8) {
break;
}
@ -279,7 +286,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionWithinSameLeafQueue() throws Exception {
setupCluster();
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
@ -291,21 +297,18 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
setupCluster();
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
verifyPreemption();
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
setupCluster();
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
verifyPreemption();
}
@Test
public void testNoPreemptionFromDisallowedQueue() throws Exception {
setupCluster();
submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
verifyNoPreemption();
}
@ -331,9 +334,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionSelectNonAMContainer() throws Exception {
setupCluster();
takeAllResource("root.preemptable.child-1");
takeAllResources("root.preemptable.child-1");
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import java.util.List;
public class TestVisitedResourceRequestTracker {
private final ClusterNodeTracker<FSSchedulerNode>
nodeTracker = new ClusterNodeTracker<>();
private final ResourceRequest
anyRequest, rackRequest, node1Request, node2Request;
private final String NODE_VISITED = "The node is already visited. ";
private final String RACK_VISITED = "The rack is already visited. ";
private final String ANY_VISITED = "ANY is already visited. ";
private final String NODE_FAILURE = "The node is visited again.";
private final String RACK_FAILURE = "The rack is visited again.";
private final String ANY_FAILURE = "ANY is visited again.";
private final String FIRST_CALL_FAILURE = "First call to visit failed.";
public TestVisitedResourceRequestTracker() {
List<RMNode> rmNodes =
MockNodes.newNodes(1, 2, Resources.createResource(8192, 8));
FSSchedulerNode node1 = new FSSchedulerNode(rmNodes.get(0), false);
nodeTracker.addNode(node1);
node1Request = createRR(node1.getNodeName(), 1);
FSSchedulerNode node2 = new FSSchedulerNode(rmNodes.get(1), false);
node2Request = createRR(node2.getNodeName(), 1);
nodeTracker.addNode(node2);
anyRequest = createRR(ResourceRequest.ANY, 2);
rackRequest = createRR(node1.getRackName(), 2);
}
private ResourceRequest createRR(String resourceName, int count) {
return ResourceRequest.newInstance(
Priority.UNDEFINED, resourceName, Resources.none(), count);
}
@Test
public void testVisitAnyRequestFirst() {
VisitedResourceRequestTracker tracker =
new VisitedResourceRequestTracker(nodeTracker);
// Visit ANY request first
assertTrue(FIRST_CALL_FAILURE, tracker.visit(anyRequest));
// All other requests should return false
assertFalse(ANY_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node1Request));
assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node2Request));
}
@Test
public void testVisitRackRequestFirst() {
VisitedResourceRequestTracker tracker =
new VisitedResourceRequestTracker(nodeTracker);
// Visit rack request first
assertTrue(FIRST_CALL_FAILURE, tracker.visit(rackRequest));
// All other requests should return false
assertFalse(RACK_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node1Request));
assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node2Request));
}
@Test
public void testVisitNodeRequestFirst() {
VisitedResourceRequestTracker tracker =
new VisitedResourceRequestTracker(nodeTracker);
// Visit node1 first
assertTrue(FIRST_CALL_FAILURE, tracker.visit(node1Request));
// Rack and ANY should return false
assertFalse(NODE_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
assertFalse(NODE_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
// The other node should return true
assertTrue(NODE_VISITED + "Different node visit failed",
tracker.visit(node2Request));
}
}