diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 5e623d9d6bc..b2f17c4e07e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -182,6 +182,24 @@ public class Resources {
return subtractFrom(clone(lhs), rhs);
}
+ /**
+ * Subtract rhs from lhs and reset any negative
+ * values to zero.
+ * @param lhs {@link Resource} to subtract from
+ * @param rhs {@link Resource} to subtract
+ * @return the value of lhs after subtraction
+ */
+ public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
+ subtractFrom(lhs, rhs);
+ if (lhs.getMemorySize() < 0) {
+ lhs.setMemorySize(0);
+ }
+ if (lhs.getVirtualCores() < 0) {
+ lhs.setVirtualCores(0);
+ }
+ return lhs;
+ }
+
public static Resource negate(Resource resource) {
return subtract(NONE, resource);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index dc376a33620..3851d20e36b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -120,6 +120,7 @@ public abstract class AbstractYarnScheduler
*/
protected ConcurrentMap> applications;
protected int nmExpireInterval;
+ protected long nmHeartbeatInterval;
protected final static List EMPTY_CONTAINER_LIST =
new ArrayList();
@@ -156,6 +157,9 @@ public abstract class AbstractYarnScheduler
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ nmHeartbeatInterval =
+ conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
long configuredMaximumAllocationWaitTime =
conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index f67aa3c005d..28c599ebdfb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -86,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private final Set containersToPreempt = new HashSet<>();
private Resource fairshareStarvation = Resources.none();
private long lastTimeAtFairShare;
+ private long nextStarvationCheck;
// minShareStarvation attributed to this application by the leaf queue
private Resource minshareStarvation = Resources.none();
@@ -210,15 +211,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
}
for (FSSchedulerNode node: blacklistNodeIds) {
- Resources.subtractFrom(availableResources,
+ Resources.subtractFromNonNegative(availableResources,
node.getUnallocatedResource());
}
- if (availableResources.getMemorySize() < 0) {
- availableResources.setMemorySize(0);
- }
- if (availableResources.getVirtualCores() < 0) {
- availableResources.setVirtualCores(0);
- }
}
/**
@@ -528,6 +523,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return Resources.add(fairshareStarvation, minshareStarvation);
}
+ /**
+ * Get last computed fairshare starvation.
+ *
+ * @return last computed fairshare starvation
+ */
+ Resource getFairshareStarvation() {
+ return fairshareStarvation;
+ }
+
/**
* Set the minshare attributed to this application. To be called only from
* {@link FSLeafQueue#updateStarvedApps}.
@@ -1066,17 +1070,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
/**
- * Helper method that computes the extent of fairshare fairshareStarvation.
+ * Helper method that computes the extent of fairshare starvation.
+ * @return freshly computed fairshare starvation
*/
Resource fairShareStarvation() {
Resource threshold = Resources.multiply(
getFairShare(), fsQueue.getFairSharePreemptionThreshold());
- Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+ Resource starvation = Resources.componentwiseMin(threshold, demand);
+ Resources.subtractFromNonNegative(starvation, getResourceUsage());
long now = scheduler.getClock().getTime();
- boolean starved = Resources.greaterThan(
- fsQueue.getPolicy().getResourceCalculator(),
- scheduler.getClusterResource(), starvation, Resources.none());
+ boolean starved = !Resources.isNone(starvation);
if (!starved) {
lastTimeAtFairShare = now;
@@ -1104,6 +1108,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return !Resources.isNone(fairshareStarvation);
}
+ /**
+ * Fetch a list of RRs corresponding to the extent the app is starved
+ * (fairshare and minshare). This method considers the number of containers
+ * in a RR and also only one locality-level (the first encountered
+ * resourceName).
+ *
+ * @return list of {@link ResourceRequest}s corresponding to the amount of
+ * starvation.
+ */
+ List getStarvedResourceRequests() {
+ // List of RRs we build in this method to return
+ List ret = new ArrayList<>();
+
+ // Track visited RRs to avoid the same RR at multiple locality levels
+ VisitedResourceRequestTracker visitedRRs =
+ new VisitedResourceRequestTracker(scheduler.getNodeTracker());
+
+ // Start with current starvation and track the pending amount
+ Resource pending = getStarvation();
+ for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+ if (Resources.isNone(pending)) {
+ // Found enough RRs to match the starvation
+ break;
+ }
+
+ // See if we have already seen this RR
+ if (!visitedRRs.visit(rr)) {
+ continue;
+ }
+
+ // A RR can have multiple containers of a capability. We need to
+ // compute the number of containers that fit in "pending".
+ int numContainersThatFit = (int) Math.floor(
+ Resources.ratio(scheduler.getResourceCalculator(),
+ pending, rr.getCapability()));
+ if (numContainersThatFit == 0) {
+ // This RR's capability is too large to fit in pending
+ continue;
+ }
+
+ // If the RR is only partially being satisfied, include only the
+ // partial number of containers.
+ if (numContainersThatFit < rr.getNumContainers()) {
+ rr = ResourceRequest.newInstance(rr.getPriority(),
+ rr.getResourceName(), rr.getCapability(), numContainersThatFit);
+ }
+
+ // Add the RR to return list and adjust "pending" accordingly
+ ret.add(rr);
+ Resources.subtractFromNonNegative(pending,
+ Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+ }
+
+ return ret;
+ }
+
+ /**
+ * Notify this app that preemption has been triggered to make room for
+ * outstanding demand. The app should not be considered starved until after
+ * the specified delay.
+ *
+ * @param delayBeforeNextStarvationCheck duration to wait
+ */
+ void preemptionTriggered(long delayBeforeNextStarvationCheck) {
+ nextStarvationCheck =
+ scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
+ }
+
+ /**
+ * Whether this app's starvation should be considered.
+ */
+ boolean shouldCheckForStarvation() {
+ return scheduler.getClock().getTime() >= nextStarvationCheck;
+ }
+
/* Schedulable methods implementation */
@Override
@@ -1116,6 +1195,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return demand;
}
+ /**
+ * Get the current app's unsatisfied demand.
+ */
+ Resource getPendingDemand() {
+ return Resources.subtract(demand, getResourceUsage());
+ }
+
@Override
public long getStartTime() {
return startTime;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index f55e1b59471..30295cd6e60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -219,6 +219,63 @@ public class FSLeafQueue extends FSQueue {
}
}
+ /**
+ * Compute the extent of fairshare starvation for a set of apps.
+ *
+ * @param appsWithDemand apps to compute fairshare starvation for
+ * @return aggregate fairshare starvation for all apps
+ */
+ private Resource updateStarvedAppsFairshare(
+ TreeSet appsWithDemand) {
+ Resource fairShareStarvation = Resources.clone(none());
+ // Fetch apps with unmet demand sorted by fairshare starvation
+ for (FSAppAttempt app : appsWithDemand) {
+ Resource appStarvation = app.fairShareStarvation();
+ if (!Resources.isNone(appStarvation)) {
+ context.getStarvedApps().addStarvedApp(app);
+ Resources.addTo(fairShareStarvation, appStarvation);
+ } else {
+ break;
+ }
+ }
+ return fairShareStarvation;
+ }
+
+ /**
+ * Distribute minshare starvation to a set of apps
+ * @param appsWithDemand set of apps
+ * @param minShareStarvation minshare starvation to distribute
+ */
+ private void updateStarvedAppsMinshare(
+ final TreeSet appsWithDemand,
+ final Resource minShareStarvation) {
+ Resource pending = Resources.clone(minShareStarvation);
+
+ // Keep adding apps to the starved list until the unmet demand goes over
+ // the remaining minshare
+ for (FSAppAttempt app : appsWithDemand) {
+ if (!Resources.isNone(pending)) {
+ Resource appMinShare = app.getPendingDemand();
+ Resources.subtractFromNonNegative(
+ appMinShare, app.getFairshareStarvation());
+
+ if (Resources.greaterThan(policy.getResourceCalculator(),
+ scheduler.getClusterResource(), appMinShare, pending)) {
+ Resources.subtractFromNonNegative(appMinShare, pending);
+ pending = none();
+ } else {
+ Resources.subtractFromNonNegative(pending, appMinShare);
+ }
+ app.setMinshareStarvation(appMinShare);
+ context.getStarvedApps().addStarvedApp(app);
+ } else {
+ // Reset minshare starvation in case we had set it in a previous
+ // iteration
+ app.resetMinshareStarvation();
+ }
+ }
+ }
+
/**
* Helper method to identify starved applications. This needs to be called
* ONLY from {@link #updateInternal}, after the application shares
@@ -237,44 +294,20 @@ public class FSLeafQueue extends FSQueue {
* starved due to fairshare, there might still be starved applications.
*/
private void updateStarvedApps() {
- // First identify starved applications and track total amount of
- // starvation (in resources)
- Resource fairShareStarvation = Resources.clone(none());
+ // Fetch apps with pending demand
+ TreeSet appsWithDemand = fetchAppsWithDemand(false);
- // Fetch apps with unmet demand sorted by fairshare starvation
- TreeSet appsWithDemand = fetchAppsWithDemand();
- for (FSAppAttempt app : appsWithDemand) {
- Resource appStarvation = app.fairShareStarvation();
- if (!Resources.equals(Resources.none(), appStarvation)) {
- context.getStarvedApps().addStarvedApp(app);
- Resources.addTo(fairShareStarvation, appStarvation);
- } else {
- break;
- }
- }
+ // Process apps with fairshare starvation
+ Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);
// Compute extent of minshare starvation
Resource minShareStarvation = minShareStarvation();
// Compute minshare starvation that is not subsumed by fairshare starvation
- Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+ Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);
- // Keep adding apps to the starved list until the unmet demand goes over
- // the remaining minshare
- for (FSAppAttempt app : appsWithDemand) {
- if (Resources.greaterThan(policy.getResourceCalculator(),
- scheduler.getClusterResource(), minShareStarvation, none())) {
- Resource appPendingDemand =
- Resources.subtract(app.getDemand(), app.getResourceUsage());
- Resources.subtractFrom(minShareStarvation, appPendingDemand);
- app.setMinshareStarvation(appPendingDemand);
- context.getStarvedApps().addStarvedApp(app);
- } else {
- // Reset minshare starvation in case we had set it in a previous
- // iteration
- app.resetMinshareStarvation();
- }
- }
+ // Assign this minshare to apps with pending demand over fairshare
+ updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
}
@Override
@@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue {
return assigned;
}
- for (FSAppAttempt sched : fetchAppsWithDemand()) {
+ for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
continue;
}
@@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue {
return assigned;
}
- private TreeSet fetchAppsWithDemand() {
+ /**
+ * Fetch the subset of apps that have unmet demand. When used for
+ * preemption-related code (as opposed to allocation), omits apps that
+ * should not be checked for starvation.
+ *
+ * @param assignment whether the apps are for allocation containers, as
+ * opposed to preemption calculations
+ * @return Set of apps with unmet demand
+ */
+ private TreeSet fetchAppsWithDemand(boolean assignment) {
TreeSet pendingForResourceApps =
new TreeSet<>(policy.getComparator());
readLock.lock();
try {
for (FSAppAttempt app : runnableApps) {
Resource pending = app.getAppAttemptResourceUsage().getPending();
- if (!pending.equals(none())) {
+ if (!Resources.isNone(pending) &&
+ (assignment || app.shouldCheckForStarvation())) {
pendingForResourceApps.add(app);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index 8b4e4bd9293..af73c10f796 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -41,20 +41,26 @@ class FSPreemptionThread extends Thread {
protected final FSContext context;
private final FairScheduler scheduler;
private final long warnTimeBeforeKill;
+ private final long delayBeforeNextStarvationCheck;
private final Timer preemptionTimer;
FSPreemptionThread(FairScheduler scheduler) {
+ setDaemon(true);
+ setName("FSPreemptionThread");
this.scheduler = scheduler;
this.context = scheduler.getContext();
FairSchedulerConfiguration fsConf = scheduler.getConf();
context.setPreemptionEnabled();
context.setPreemptionUtilizationThreshold(
fsConf.getPreemptionUtilizationThreshold());
- warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
preemptionTimer = new Timer("Preemption Timer", true);
- setDaemon(true);
- setName("FSPreemptionThread");
+ warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+ long allocDelay = (fsConf.isContinuousSchedulingEnabled()
+ ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs
+ : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
+ delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
+ fsConf.getWaitTimeBeforeNextStarvationCheck();
}
public void run() {
@@ -62,13 +68,8 @@ class FSPreemptionThread extends Thread {
FSAppAttempt starvedApp;
try{
starvedApp = context.getStarvedApps().take();
- if (!Resources.isNone(starvedApp.getStarvation())) {
- PreemptableContainers containers =
- identifyContainersToPreempt(starvedApp);
- if (containers != null) {
- preemptContainers(containers.containers);
- }
- }
+ preemptContainers(identifyContainersToPreempt(starvedApp));
+ starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
} catch (InterruptedException e) {
LOG.info("Preemption thread interrupted! Exiting.");
return;
@@ -77,55 +78,57 @@ class FSPreemptionThread extends Thread {
}
/**
- * Given an app, identify containers to preempt to satisfy the app's next
- * resource request.
+ * Given an app, identify containers to preempt to satisfy the app's
+ * starvation.
+ *
+ * Mechanics:
+ * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
+ * starvation.
+ * 2. For each {@link ResourceRequest}, iterate through matching
+ * nodes and identify containers to preempt all on one node, also
+ * optimizing for least number of AM container preemptions.
*
* @param starvedApp starved application for which we are identifying
* preemption targets
- * @return list of containers to preempt to satisfy starvedApp, null if the
- * app cannot be satisfied by preempting any running containers
+ * @return list of containers to preempt to satisfy starvedApp
*/
- private PreemptableContainers identifyContainersToPreempt(
+ private List identifyContainersToPreempt(
FSAppAttempt starvedApp) {
- PreemptableContainers bestContainers = null;
+ List containersToPreempt = new ArrayList<>();
- // Find the nodes that match the next resource request
- ResourceRequest request = starvedApp.getNextResourceRequest();
- // TODO (KK): Should we check other resource requests if we can't match
- // the first one?
+ // Iterate through enough RRs to address app's starvation
+ for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
+ for (int i = 0; i < rr.getNumContainers(); i++) {
+ PreemptableContainers bestContainers = null;
+ List potentialNodes = scheduler.getNodeTracker()
+ .getNodesByResourceName(rr.getResourceName());
+ for (FSSchedulerNode node : potentialNodes) {
+ // TODO (YARN-5829): Attempt to reserve the node for starved app.
+ if (isNodeAlreadyReserved(node, starvedApp)) {
+ continue;
+ }
- Resource requestCapability = request.getCapability();
- List potentialNodes =
- scheduler.getNodeTracker().getNodesByResourceName(
- request.getResourceName());
+ int maxAMContainers = bestContainers == null ?
+ Integer.MAX_VALUE : bestContainers.numAMContainers;
+ PreemptableContainers preemptableContainers =
+ identifyContainersToPreemptOnNode(
+ rr.getCapability(), node, maxAMContainers);
+ if (preemptableContainers != null) {
+ // This set is better than any previously identified set.
+ bestContainers = preemptableContainers;
+ if (preemptableContainers.numAMContainers == 0) {
+ break;
+ }
+ }
+ } // End of iteration through nodes for one RR
- // From the potential nodes, pick a node that has enough containers
- // from apps over their fairshare
- for (FSSchedulerNode node : potentialNodes) {
- // TODO (YARN-5829): Attempt to reserve the node for starved app. The
- // subsequent if-check needs to be reworked accordingly.
- FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
- if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
- // This node is already reserved by another app. Let us not consider
- // this for preemption.
- continue;
- }
-
- int maxAMContainers = bestContainers == null ?
- Integer.MAX_VALUE : bestContainers.numAMContainers;
- PreemptableContainers preemptableContainers =
- identifyContainersToPreemptOnNode(requestCapability, node,
- maxAMContainers);
- if (preemptableContainers != null) {
- if (preemptableContainers.numAMContainers == 0) {
- return preemptableContainers;
- } else {
- bestContainers = preemptableContainers;
+ if (bestContainers != null && bestContainers.containers.size() > 0) {
+ containersToPreempt.addAll(bestContainers.containers);
+ trackPreemptionsAgainstNode(bestContainers.containers);
}
}
- }
-
- return bestContainers;
+ } // End of iteration over RRs
+ return containersToPreempt;
}
/**
@@ -176,23 +179,25 @@ class FSPreemptionThread extends Thread {
return null;
}
- private void preemptContainers(List containers) {
- // Mark the containers as being considered for preemption on the node.
- // Make sure the containers are subsequently removed by calling
- // FSSchedulerNode#removeContainerForPreemption.
- if (containers.size() > 0) {
- FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
- .getNode(containers.get(0).getNodeId());
- node.addContainersForPreemption(containers);
- }
+ private boolean isNodeAlreadyReserved(
+ FSSchedulerNode node, FSAppAttempt app) {
+ FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+ return nodeReservedApp != null && !nodeReservedApp.equals(app);
+ }
+ private void trackPreemptionsAgainstNode(List containers) {
+ FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
+ .getNode(containers.get(0).getNodeId());
+ node.addContainersForPreemption(containers);
+ }
+
+ private void preemptContainers(List containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
- FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container " + container +
- " from queue " + queue.getName());
+ " from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index f75f2ddd8f0..5594140b20a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -1772,4 +1772,8 @@ public class FairScheduler extends
public float getReservableNodesRatio() {
return reservableNodesRatio;
}
+
+ long getNMHeartbeatInterval() {
+ return nmHeartbeatInterval;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index b18dd7dee42..8e8e37b2dbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration {
protected static final String PREEMPTION_THRESHOLD =
CONF_PREFIX + "preemption.cluster-utilization-threshold";
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
-
- protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
- protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
+ /**
+ * Configurable delay (ms) before an app's starvation is considered after
+ * it is identified. This is to give the scheduler enough time to
+ * allocate containers post preemption. This delay is added to the
+ * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
+ *
+ * This is intended to be a backdoor on production clusters, and hence
+ * intentionally not documented.
+ */
+ protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
+ CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
+ protected static final long
+ DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;
+
/** Whether to assign multiple containers in one check-in. */
public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration {
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
}
- public int getPreemptionInterval() {
- return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+ public long getWaitTimeBeforeNextStarvationCheck() {
+ return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
+ DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
}
public int getWaitTimeBeforeKill() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
new file mode 100644
index 00000000000..f157263eb5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Applications place {@link ResourceRequest}s at multiple levels. This is a
+ * helper class that allows tracking if a {@link ResourceRequest} has been
+ * visited at a different locality level.
+ *
+ * This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}.
+ * The implementation is not thread-safe.
+ */
+class VisitedResourceRequestTracker {
+ private static final Log LOG =
+ LogFactory.getLog(VisitedResourceRequestTracker.class);
+ private final Map> map =
+ new HashMap<>();
+ private final ClusterNodeTracker nodeTracker;
+
+ VisitedResourceRequestTracker(
+ ClusterNodeTracker nodeTracker) {
+ this.nodeTracker = nodeTracker;
+ }
+
+ /**
+ * Check if the {@link ResourceRequest} is visited before, and track it.
+ * @param rr {@link ResourceRequest} to visit
+ * @return true if rr is the first visit across all
+ * locality levels, false otherwise
+ */
+ boolean visit(ResourceRequest rr) {
+ Priority priority = rr.getPriority();
+ Resource capability = rr.getCapability();
+
+ Map subMap = map.get(priority);
+ if (subMap == null) {
+ subMap = new HashMap<>();
+ map.put(priority, subMap);
+ }
+
+ TrackerPerPriorityResource tracker = subMap.get(capability);
+ if (tracker == null) {
+ tracker = new TrackerPerPriorityResource();
+ subMap.put(capability, tracker);
+ }
+
+ return tracker.visit(rr.getResourceName());
+ }
+
+ private class TrackerPerPriorityResource {
+ private Set racksWithNodesVisited = new HashSet<>();
+ private Set racksVisted = new HashSet<>();
+ private boolean anyVisited;
+
+ private boolean visitAny() {
+ if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) {
+ anyVisited = true;
+ }
+ return anyVisited;
+ }
+
+ private boolean visitRack(String rackName) {
+ if (anyVisited || racksWithNodesVisited.contains(rackName)) {
+ return false;
+ } else {
+ racksVisted.add(rackName);
+ return true;
+ }
+ }
+
+ private boolean visitNode(String rackName) {
+ if (anyVisited || racksVisted.contains(rackName)) {
+ return false;
+ } else {
+ racksWithNodesVisited.add(rackName);
+ return true;
+ }
+ }
+
+ /**
+ * Based on whether resourceName is a node, rack or ANY,
+ * check if this has been visited earlier.
+ *
+ * A node is considered visited if its rack or ANY have been visited.
+ * A rack is considered visited if any nodes or ANY have been visited.
+ * Any is considered visited if any of the nodes/racks have been visited.
+ *
+ * @param resourceName nodename or rackname or ANY
+ * @return true if this is the first visit, false otherwise
+ */
+ private boolean visit(String resourceName) {
+ if (resourceName.equals(ResourceRequest.ANY)) {
+ return visitAny();
+ }
+
+ List nodes =
+ nodeTracker.getNodesByResourceName(resourceName);
+ int numNodes = nodes.size();
+ if (numNodes == 0) {
+ LOG.error("Found ResourceRequest for a non-existent node/rack named " +
+ resourceName);
+ return false;
+ }
+
+ if (numNodes == 1) {
+ // Found a single node. To be safe, let us verify it is a node and
+ // not a rack with a single node.
+ FSSchedulerNode node = nodes.get(0);
+ if (node.getNodeName().equals(resourceName)) {
+ return visitNode(node.getRackName());
+ }
+ }
+
+ // At this point, it is not ANY or a node. Must be a rack
+ return visitRack(resourceName);
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
index 25780cdcd28..706cdc9034c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
@@ -21,6 +21,8 @@ import java.util.HashSet;
import java.util.Set;
public class FairSchedulerWithMockPreemption extends FairScheduler {
+ static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000;
+
@Override
protected void createPreemptionThread() {
preemptionThread = new MockPreemptionThread(this);
@@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
private Set appsAdded = new HashSet<>();
private int totalAppsAdded = 0;
- MockPreemptionThread(FairScheduler scheduler) {
+ private MockPreemptionThread(FairScheduler scheduler) {
super(scheduler);
}
@@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
FSAppAttempt app = context.getStarvedApps().take();
appsAdded.add(app);
totalAppsAdded++;
+ app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS);
} catch (InterruptedException e) {
return;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
index a5b2d868d40..3a79ac0891b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
@@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
+ private final ControlledClock clock = new ControlledClock();
+
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
private static final String[] QUEUES =
@@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
+ "minshare and fairshare queues",
3, preemptionThread.uniqueAppsAdded());
- // Verify the apps get added again on a subsequent update
+ // Verify apps are added again only after the set delay for starvation has
+ // passed.
+ clock.tickSec(1);
scheduler.update();
- Thread.yield();
-
+ assertEquals("Apps re-added even before starvation delay passed",
+ preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded());
verifyLeafQueueStarvation();
+
+ clock.tickMsec(
+ FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS);
+ scheduler.update();
assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
}
@@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
sendEnoughNodeUpdatesToAssignFully();
// Sleep to hit the preemption timeouts
- Thread.sleep(10);
+ clock.tickMsec(10);
// Scheduler update to populate starved apps
scheduler.update();
@@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
ALLOC_FILE.exists());
resourceManager = new MockRM(conf);
- resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+ scheduler.setClock(clock);
+ resourceManager.start();
preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
scheduler.preemptionThread;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 16df1edd3c5..a4d69bf2214 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
private static final int GB = 1024;
+ // Scheduler clock
+ private final ControlledClock clock = new ControlledClock();
+
// Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
private static final int NODE_CAPACITY_MULTIPLE = 4;
@@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Starving app that is expected to instigate preemption
private FSAppAttempt starvingApp;
- @Parameterized.Parameters
- public static Collection getParameters() {
- return Arrays.asList(new Boolean[][] {
- {true}, {false}});
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection