From 46b6c95e0a7f771e4eba414d07dcad8b71d978c0 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 15 Feb 2017 23:16:01 -0800 Subject: [PATCH] YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha) (cherry picked from commit 6c25dbcdc0517a825b92fb16444aa1d3761e160c) --- .../hadoop/yarn/util/resource/Resources.java | 18 +++ .../scheduler/AbstractYarnScheduler.java | 4 + .../scheduler/fair/FSAppAttempt.java | 110 +++++++++++-- .../scheduler/fair/FSLeafQueue.java | 111 +++++++++---- .../scheduler/fair/FSPreemptionThread.java | 125 ++++++++------- .../scheduler/fair/FairScheduler.java | 4 + .../fair/FairSchedulerConfiguration.java | 23 ++- .../fair/VisitedResourceRequestTracker.java | 146 ++++++++++++++++++ .../fair/FairSchedulerWithMockPreemption.java | 5 +- .../scheduler/fair/TestFSAppStarvation.java | 20 ++- .../fair/TestFairSchedulerPreemption.java | 45 +++--- .../TestVisitedResourceRequestTracker.java | 112 ++++++++++++++ 12 files changed, 584 insertions(+), 139 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 5e623d9d6bc..b2f17c4e07e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -182,6 +182,24 @@ public class Resources { return subtractFrom(clone(lhs), rhs); } + /** + * Subtract rhs from lhs and reset any negative + * values to zero. + * @param lhs {@link Resource} to subtract from + * @param rhs {@link Resource} to subtract + * @return the value of lhs after subtraction + */ + public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) { + subtractFrom(lhs, rhs); + if (lhs.getMemorySize() < 0) { + lhs.setMemorySize(0); + } + if (lhs.getVirtualCores() < 0) { + lhs.setVirtualCores(0); + } + return lhs; + } + public static Resource negate(Resource resource) { return subtract(NONE, resource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index dc376a33620..3851d20e36b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -120,6 +120,7 @@ public abstract class AbstractYarnScheduler */ protected ConcurrentMap> applications; protected int nmExpireInterval; + protected long nmHeartbeatInterval; protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); @@ -156,6 +157,9 @@ public abstract class AbstractYarnScheduler nmExpireInterval = conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); + nmHeartbeatInterval = + conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); long configuredMaximumAllocationWaitTime = conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index f67aa3c005d..28c599ebdfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -86,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt private final Set containersToPreempt = new HashSet<>(); private Resource fairshareStarvation = Resources.none(); private long lastTimeAtFairShare; + private long nextStarvationCheck; // minShareStarvation attributed to this application by the leaf queue private Resource minshareStarvation = Resources.none(); @@ -210,15 +211,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this)); } for (FSSchedulerNode node: blacklistNodeIds) { - Resources.subtractFrom(availableResources, + Resources.subtractFromNonNegative(availableResources, node.getUnallocatedResource()); } - if (availableResources.getMemorySize() < 0) { - availableResources.setMemorySize(0); - } - if (availableResources.getVirtualCores() < 0) { - availableResources.setVirtualCores(0); - } } /** @@ -528,6 +523,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return Resources.add(fairshareStarvation, minshareStarvation); } + /** + * Get last computed fairshare starvation. + * + * @return last computed fairshare starvation + */ + Resource getFairshareStarvation() { + return fairshareStarvation; + } + /** * Set the minshare attributed to this application. To be called only from * {@link FSLeafQueue#updateStarvedApps}. @@ -1066,17 +1070,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } /** - * Helper method that computes the extent of fairshare fairshareStarvation. + * Helper method that computes the extent of fairshare starvation. + * @return freshly computed fairshare starvation */ Resource fairShareStarvation() { Resource threshold = Resources.multiply( getFairShare(), fsQueue.getFairSharePreemptionThreshold()); - Resource starvation = Resources.subtractFrom(threshold, getResourceUsage()); + Resource starvation = Resources.componentwiseMin(threshold, demand); + Resources.subtractFromNonNegative(starvation, getResourceUsage()); long now = scheduler.getClock().getTime(); - boolean starved = Resources.greaterThan( - fsQueue.getPolicy().getResourceCalculator(), - scheduler.getClusterResource(), starvation, Resources.none()); + boolean starved = !Resources.isNone(starvation); if (!starved) { lastTimeAtFairShare = now; @@ -1104,6 +1108,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return !Resources.isNone(fairshareStarvation); } + /** + * Fetch a list of RRs corresponding to the extent the app is starved + * (fairshare and minshare). This method considers the number of containers + * in a RR and also only one locality-level (the first encountered + * resourceName). + * + * @return list of {@link ResourceRequest}s corresponding to the amount of + * starvation. + */ + List getStarvedResourceRequests() { + // List of RRs we build in this method to return + List ret = new ArrayList<>(); + + // Track visited RRs to avoid the same RR at multiple locality levels + VisitedResourceRequestTracker visitedRRs = + new VisitedResourceRequestTracker(scheduler.getNodeTracker()); + + // Start with current starvation and track the pending amount + Resource pending = getStarvation(); + for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) { + if (Resources.isNone(pending)) { + // Found enough RRs to match the starvation + break; + } + + // See if we have already seen this RR + if (!visitedRRs.visit(rr)) { + continue; + } + + // A RR can have multiple containers of a capability. We need to + // compute the number of containers that fit in "pending". + int numContainersThatFit = (int) Math.floor( + Resources.ratio(scheduler.getResourceCalculator(), + pending, rr.getCapability())); + if (numContainersThatFit == 0) { + // This RR's capability is too large to fit in pending + continue; + } + + // If the RR is only partially being satisfied, include only the + // partial number of containers. + if (numContainersThatFit < rr.getNumContainers()) { + rr = ResourceRequest.newInstance(rr.getPriority(), + rr.getResourceName(), rr.getCapability(), numContainersThatFit); + } + + // Add the RR to return list and adjust "pending" accordingly + ret.add(rr); + Resources.subtractFromNonNegative(pending, + Resources.multiply(rr.getCapability(), rr.getNumContainers())); + } + + return ret; + } + + /** + * Notify this app that preemption has been triggered to make room for + * outstanding demand. The app should not be considered starved until after + * the specified delay. + * + * @param delayBeforeNextStarvationCheck duration to wait + */ + void preemptionTriggered(long delayBeforeNextStarvationCheck) { + nextStarvationCheck = + scheduler.getClock().getTime() + delayBeforeNextStarvationCheck; + } + + /** + * Whether this app's starvation should be considered. + */ + boolean shouldCheckForStarvation() { + return scheduler.getClock().getTime() >= nextStarvationCheck; + } + /* Schedulable methods implementation */ @Override @@ -1116,6 +1195,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return demand; } + /** + * Get the current app's unsatisfied demand. + */ + Resource getPendingDemand() { + return Resources.subtract(demand, getResourceUsage()); + } + @Override public long getStartTime() { return startTime; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index f55e1b59471..30295cd6e60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -219,6 +219,63 @@ public class FSLeafQueue extends FSQueue { } } + /** + * Compute the extent of fairshare starvation for a set of apps. + * + * @param appsWithDemand apps to compute fairshare starvation for + * @return aggregate fairshare starvation for all apps + */ + private Resource updateStarvedAppsFairshare( + TreeSet appsWithDemand) { + Resource fairShareStarvation = Resources.clone(none()); + // Fetch apps with unmet demand sorted by fairshare starvation + for (FSAppAttempt app : appsWithDemand) { + Resource appStarvation = app.fairShareStarvation(); + if (!Resources.isNone(appStarvation)) { + context.getStarvedApps().addStarvedApp(app); + Resources.addTo(fairShareStarvation, appStarvation); + } else { + break; + } + } + return fairShareStarvation; + } + + /** + * Distribute minshare starvation to a set of apps + * @param appsWithDemand set of apps + * @param minShareStarvation minshare starvation to distribute + */ + private void updateStarvedAppsMinshare( + final TreeSet appsWithDemand, + final Resource minShareStarvation) { + Resource pending = Resources.clone(minShareStarvation); + + // Keep adding apps to the starved list until the unmet demand goes over + // the remaining minshare + for (FSAppAttempt app : appsWithDemand) { + if (!Resources.isNone(pending)) { + Resource appMinShare = app.getPendingDemand(); + Resources.subtractFromNonNegative( + appMinShare, app.getFairshareStarvation()); + + if (Resources.greaterThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), appMinShare, pending)) { + Resources.subtractFromNonNegative(appMinShare, pending); + pending = none(); + } else { + Resources.subtractFromNonNegative(pending, appMinShare); + } + app.setMinshareStarvation(appMinShare); + context.getStarvedApps().addStarvedApp(app); + } else { + // Reset minshare starvation in case we had set it in a previous + // iteration + app.resetMinshareStarvation(); + } + } + } + /** * Helper method to identify starved applications. This needs to be called * ONLY from {@link #updateInternal}, after the application shares @@ -237,44 +294,20 @@ public class FSLeafQueue extends FSQueue { * starved due to fairshare, there might still be starved applications. */ private void updateStarvedApps() { - // First identify starved applications and track total amount of - // starvation (in resources) - Resource fairShareStarvation = Resources.clone(none()); + // Fetch apps with pending demand + TreeSet appsWithDemand = fetchAppsWithDemand(false); - // Fetch apps with unmet demand sorted by fairshare starvation - TreeSet appsWithDemand = fetchAppsWithDemand(); - for (FSAppAttempt app : appsWithDemand) { - Resource appStarvation = app.fairShareStarvation(); - if (!Resources.equals(Resources.none(), appStarvation)) { - context.getStarvedApps().addStarvedApp(app); - Resources.addTo(fairShareStarvation, appStarvation); - } else { - break; - } - } + // Process apps with fairshare starvation + Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand); // Compute extent of minshare starvation Resource minShareStarvation = minShareStarvation(); // Compute minshare starvation that is not subsumed by fairshare starvation - Resources.subtractFrom(minShareStarvation, fairShareStarvation); + Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation); - // Keep adding apps to the starved list until the unmet demand goes over - // the remaining minshare - for (FSAppAttempt app : appsWithDemand) { - if (Resources.greaterThan(policy.getResourceCalculator(), - scheduler.getClusterResource(), minShareStarvation, none())) { - Resource appPendingDemand = - Resources.subtract(app.getDemand(), app.getResourceUsage()); - Resources.subtractFrom(minShareStarvation, appPendingDemand); - app.setMinshareStarvation(appPendingDemand); - context.getStarvedApps().addStarvedApp(app); - } else { - // Reset minshare starvation in case we had set it in a previous - // iteration - app.resetMinshareStarvation(); - } - } + // Assign this minshare to apps with pending demand over fairshare + updateStarvedAppsMinshare(appsWithDemand, minShareStarvation); } @Override @@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue { return assigned; } - for (FSAppAttempt sched : fetchAppsWithDemand()) { + for (FSAppAttempt sched : fetchAppsWithDemand(true)) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } @@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue { return assigned; } - private TreeSet fetchAppsWithDemand() { + /** + * Fetch the subset of apps that have unmet demand. When used for + * preemption-related code (as opposed to allocation), omits apps that + * should not be checked for starvation. + * + * @param assignment whether the apps are for allocation containers, as + * opposed to preemption calculations + * @return Set of apps with unmet demand + */ + private TreeSet fetchAppsWithDemand(boolean assignment) { TreeSet pendingForResourceApps = new TreeSet<>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { Resource pending = app.getAppAttemptResourceUsage().getPending(); - if (!pending.equals(none())) { + if (!Resources.isNone(pending) && + (assignment || app.shouldCheckForStarvation())) { pendingForResourceApps.add(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 8b4e4bd9293..af73c10f796 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -41,20 +41,26 @@ class FSPreemptionThread extends Thread { protected final FSContext context; private final FairScheduler scheduler; private final long warnTimeBeforeKill; + private final long delayBeforeNextStarvationCheck; private final Timer preemptionTimer; FSPreemptionThread(FairScheduler scheduler) { + setDaemon(true); + setName("FSPreemptionThread"); this.scheduler = scheduler; this.context = scheduler.getContext(); FairSchedulerConfiguration fsConf = scheduler.getConf(); context.setPreemptionEnabled(); context.setPreemptionUtilizationThreshold( fsConf.getPreemptionUtilizationThreshold()); - warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); preemptionTimer = new Timer("Preemption Timer", true); - setDaemon(true); - setName("FSPreemptionThread"); + warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill(); + long allocDelay = (fsConf.isContinuousSchedulingEnabled() + ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs + : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats + delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + + fsConf.getWaitTimeBeforeNextStarvationCheck(); } public void run() { @@ -62,13 +68,8 @@ class FSPreemptionThread extends Thread { FSAppAttempt starvedApp; try{ starvedApp = context.getStarvedApps().take(); - if (!Resources.isNone(starvedApp.getStarvation())) { - PreemptableContainers containers = - identifyContainersToPreempt(starvedApp); - if (containers != null) { - preemptContainers(containers.containers); - } - } + preemptContainers(identifyContainersToPreempt(starvedApp)); + starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); return; @@ -77,55 +78,57 @@ class FSPreemptionThread extends Thread { } /** - * Given an app, identify containers to preempt to satisfy the app's next - * resource request. + * Given an app, identify containers to preempt to satisfy the app's + * starvation. + * + * Mechanics: + * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of + * starvation. + * 2. For each {@link ResourceRequest}, iterate through matching + * nodes and identify containers to preempt all on one node, also + * optimizing for least number of AM container preemptions. * * @param starvedApp starved application for which we are identifying * preemption targets - * @return list of containers to preempt to satisfy starvedApp, null if the - * app cannot be satisfied by preempting any running containers + * @return list of containers to preempt to satisfy starvedApp */ - private PreemptableContainers identifyContainersToPreempt( + private List identifyContainersToPreempt( FSAppAttempt starvedApp) { - PreemptableContainers bestContainers = null; + List containersToPreempt = new ArrayList<>(); - // Find the nodes that match the next resource request - ResourceRequest request = starvedApp.getNextResourceRequest(); - // TODO (KK): Should we check other resource requests if we can't match - // the first one? + // Iterate through enough RRs to address app's starvation + for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { + for (int i = 0; i < rr.getNumContainers(); i++) { + PreemptableContainers bestContainers = null; + List potentialNodes = scheduler.getNodeTracker() + .getNodesByResourceName(rr.getResourceName()); + for (FSSchedulerNode node : potentialNodes) { + // TODO (YARN-5829): Attempt to reserve the node for starved app. + if (isNodeAlreadyReserved(node, starvedApp)) { + continue; + } - Resource requestCapability = request.getCapability(); - List potentialNodes = - scheduler.getNodeTracker().getNodesByResourceName( - request.getResourceName()); + int maxAMContainers = bestContainers == null ? + Integer.MAX_VALUE : bestContainers.numAMContainers; + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode( + rr.getCapability(), node, maxAMContainers); + if (preemptableContainers != null) { + // This set is better than any previously identified set. + bestContainers = preemptableContainers; + if (preemptableContainers.numAMContainers == 0) { + break; + } + } + } // End of iteration through nodes for one RR - // From the potential nodes, pick a node that has enough containers - // from apps over their fairshare - for (FSSchedulerNode node : potentialNodes) { - // TODO (YARN-5829): Attempt to reserve the node for starved app. The - // subsequent if-check needs to be reworked accordingly. - FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); - if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) { - // This node is already reserved by another app. Let us not consider - // this for preemption. - continue; - } - - int maxAMContainers = bestContainers == null ? - Integer.MAX_VALUE : bestContainers.numAMContainers; - PreemptableContainers preemptableContainers = - identifyContainersToPreemptOnNode(requestCapability, node, - maxAMContainers); - if (preemptableContainers != null) { - if (preemptableContainers.numAMContainers == 0) { - return preemptableContainers; - } else { - bestContainers = preemptableContainers; + if (bestContainers != null && bestContainers.containers.size() > 0) { + containersToPreempt.addAll(bestContainers.containers); + trackPreemptionsAgainstNode(bestContainers.containers); } } - } - - return bestContainers; + } // End of iteration over RRs + return containersToPreempt; } /** @@ -176,23 +179,25 @@ class FSPreemptionThread extends Thread { return null; } - private void preemptContainers(List containers) { - // Mark the containers as being considered for preemption on the node. - // Make sure the containers are subsequently removed by calling - // FSSchedulerNode#removeContainerForPreemption. - if (containers.size() > 0) { - FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() - .getNode(containers.get(0).getNodeId()); - node.addContainersForPreemption(containers); - } + private boolean isNodeAlreadyReserved( + FSSchedulerNode node, FSAppAttempt app) { + FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable(); + return nodeReservedApp != null && !nodeReservedApp.equals(app); + } + private void trackPreemptionsAgainstNode(List containers) { + FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker() + .getNode(containers.get(0).getNodeId()); + node.addContainersForPreemption(containers); + } + + private void preemptContainers(List containers) { // Warn application about containers to be killed for (RMContainer container : containers) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container " + container + - " from queue " + queue.getName()); + " from queue " + app.getQueueName()); app.trackContainerForPreemption(container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index f75f2ddd8f0..5594140b20a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1772,4 +1772,8 @@ public class FairScheduler extends public float getReservableNodesRatio() { return reservableNodesRatio; } + + long getNMHeartbeatInterval() { + return nmHeartbeatInterval; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index b18dd7dee42..8e8e37b2dbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration { protected static final String PREEMPTION_THRESHOLD = CONF_PREFIX + "preemption.cluster-utilization-threshold"; protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f; - - protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval"; - protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000; + protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill"; protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000; + /** + * Configurable delay (ms) before an app's starvation is considered after + * it is identified. This is to give the scheduler enough time to + * allocate containers post preemption. This delay is added to the + * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats. + * + * This is intended to be a backdoor on production clusters, and hence + * intentionally not documented. + */ + protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = + CONF_PREFIX + "waitTimeBeforeNextStarvationCheck"; + protected static final long + DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000; + /** Whether to assign multiple containers in one check-in. */ public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false; @@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration { "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler"); } - public int getPreemptionInterval() { - return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL); + public long getWaitTimeBeforeNextStarvationCheck() { + return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, + DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS); } public int getWaitTimeBeforeKill() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java new file mode 100644 index 00000000000..f157263eb5d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Applications place {@link ResourceRequest}s at multiple levels. This is a + * helper class that allows tracking if a {@link ResourceRequest} has been + * visited at a different locality level. + * + * This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}. + * The implementation is not thread-safe. + */ +class VisitedResourceRequestTracker { + private static final Log LOG = + LogFactory.getLog(VisitedResourceRequestTracker.class); + private final Map> map = + new HashMap<>(); + private final ClusterNodeTracker nodeTracker; + + VisitedResourceRequestTracker( + ClusterNodeTracker nodeTracker) { + this.nodeTracker = nodeTracker; + } + + /** + * Check if the {@link ResourceRequest} is visited before, and track it. + * @param rr {@link ResourceRequest} to visit + * @return true if rr is the first visit across all + * locality levels, false otherwise + */ + boolean visit(ResourceRequest rr) { + Priority priority = rr.getPriority(); + Resource capability = rr.getCapability(); + + Map subMap = map.get(priority); + if (subMap == null) { + subMap = new HashMap<>(); + map.put(priority, subMap); + } + + TrackerPerPriorityResource tracker = subMap.get(capability); + if (tracker == null) { + tracker = new TrackerPerPriorityResource(); + subMap.put(capability, tracker); + } + + return tracker.visit(rr.getResourceName()); + } + + private class TrackerPerPriorityResource { + private Set racksWithNodesVisited = new HashSet<>(); + private Set racksVisted = new HashSet<>(); + private boolean anyVisited; + + private boolean visitAny() { + if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) { + anyVisited = true; + } + return anyVisited; + } + + private boolean visitRack(String rackName) { + if (anyVisited || racksWithNodesVisited.contains(rackName)) { + return false; + } else { + racksVisted.add(rackName); + return true; + } + } + + private boolean visitNode(String rackName) { + if (anyVisited || racksVisted.contains(rackName)) { + return false; + } else { + racksWithNodesVisited.add(rackName); + return true; + } + } + + /** + * Based on whether resourceName is a node, rack or ANY, + * check if this has been visited earlier. + * + * A node is considered visited if its rack or ANY have been visited. + * A rack is considered visited if any nodes or ANY have been visited. + * Any is considered visited if any of the nodes/racks have been visited. + * + * @param resourceName nodename or rackname or ANY + * @return true if this is the first visit, false otherwise + */ + private boolean visit(String resourceName) { + if (resourceName.equals(ResourceRequest.ANY)) { + return visitAny(); + } + + List nodes = + nodeTracker.getNodesByResourceName(resourceName); + int numNodes = nodes.size(); + if (numNodes == 0) { + LOG.error("Found ResourceRequest for a non-existent node/rack named " + + resourceName); + return false; + } + + if (numNodes == 1) { + // Found a single node. To be safe, let us verify it is a node and + // not a rack with a single node. + FSSchedulerNode node = nodes.get(0); + if (node.getNodeName().equals(resourceName)) { + return visitNode(node.getRackName()); + } + } + + // At this point, it is not ANY or a node. Must be a rack + return visitRack(resourceName); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java index 25780cdcd28..706cdc9034c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java @@ -21,6 +21,8 @@ import java.util.HashSet; import java.util.Set; public class FairSchedulerWithMockPreemption extends FairScheduler { + static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000; + @Override protected void createPreemptionThread() { preemptionThread = new MockPreemptionThread(this); @@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler { private Set appsAdded = new HashSet<>(); private int totalAppsAdded = 0; - MockPreemptionThread(FairScheduler scheduler) { + private MockPreemptionThread(FairScheduler scheduler) { super(scheduler); } @@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler { FSAppAttempt app = context.getStarvedApps().take(); appsAdded.add(app); totalAppsAdded++; + app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS); } catch (InterruptedException e) { return; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java index a5b2d868d40..3a79ac0891b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; @@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES"); + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; private static final String[] QUEUES = @@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { + "minshare and fairshare queues", 3, preemptionThread.uniqueAppsAdded()); - // Verify the apps get added again on a subsequent update + // Verify apps are added again only after the set delay for starvation has + // passed. + clock.tickSec(1); scheduler.update(); - Thread.yield(); - + assertEquals("Apps re-added even before starvation delay passed", + preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded()); verifyLeafQueueStarvation(); + + clock.tickMsec( + FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS); + scheduler.update(); assertTrue("Each app is marked as starved exactly once", preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); } @@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { sendEnoughNodeUpdatesToAssignFully(); // Sleep to hit the preemption timeouts - Thread.sleep(10); + clock.tickMsec(10); // Scheduler update to populate starved apps scheduler.update(); @@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase { ALLOC_FILE.exists()); resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread) scheduler.preemptionThread; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 16df1edd3c5..a4d69bf2214 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.util.ControlledClock; import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues"); private static final int GB = 1024; + // Scheduler clock + private final ControlledClock clock = new ControlledClock(); + // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; @@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; - @Parameterized.Parameters - public static Collection getParameters() { - return Arrays.asList(new Boolean[][] { - {true}, {false}}); + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][] { + {"FairSharePreemption", true}, + {"MinSharePreemption", false}}); } - public TestFairSchedulerPreemption(Boolean fairshare) throws IOException { + public TestFairSchedulerPreemption(String name, boolean fairshare) + throws IOException { fairsharePreemption = fairshare; writeAllocFile(); } @Before - public void setup() { + public void setup() throws IOException { createConfiguration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE.getAbsolutePath()); conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + setupCluster(); } @After @@ -166,8 +173,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private void setupCluster() throws IOException { resourceManager = new MockRM(conf); - resourceManager.start(); scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.setClock(clock); + resourceManager.start(); // Create and add two nodes to the cluster addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE); @@ -197,7 +205,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { * * @param queueName queue name */ - private void takeAllResource(String queueName) { + private void takeAllResources(String queueName) { // Create an app that takes up all the resources on the cluster ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, 1, queueName, "default", @@ -227,8 +235,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); starvingApp = scheduler.getSchedulerApp(appAttemptId); - // Sleep long enough to pass - Thread.sleep(10); + // Move clock enough to identify starvation + clock.tickSec(1); scheduler.update(); } @@ -243,14 +251,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { */ private void submitApps(String queue1, String queue2) throws InterruptedException { - takeAllResource(queue1); + takeAllResources(queue1); preemptHalfResources(queue2); } private void verifyPreemption() throws InterruptedException { - // Sleep long enough for four containers to be preempted. Note that the - // starved app must be queued four times for containers to be preempted. - for (int i = 0; i < 10000; i++) { + // Sleep long enough for four containers to be preempted. + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() == 4) { break; } @@ -268,7 +275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { private void verifyNoPreemption() throws InterruptedException { // Sleep long enough to ensure not even one container is preempted. - for (int i = 0; i < 600; i++) { + for (int i = 0; i < 100; i++) { if (greedyApp.getLiveContainers().size() != 8) { break; } @@ -279,7 +286,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionWithinSameLeafQueue() throws Exception { - setupCluster(); String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { @@ -291,21 +297,18 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.preemptable.child-2"); verifyPreemption(); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { - setupCluster(); submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); verifyPreemption(); } @Test public void testNoPreemptionFromDisallowedQueue() throws Exception { - setupCluster(); submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1"); verifyNoPreemption(); } @@ -331,9 +334,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @Test public void testPreemptionSelectNonAMContainer() throws Exception { - setupCluster(); - - takeAllResource("root.preemptable.child-1"); + takeAllResources("root.preemptable.child-1"); setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java new file mode 100644 index 00000000000..07b849806fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at* + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +import java.util.List; + +public class TestVisitedResourceRequestTracker { + private final ClusterNodeTracker + nodeTracker = new ClusterNodeTracker<>(); + private final ResourceRequest + anyRequest, rackRequest, node1Request, node2Request; + + private final String NODE_VISITED = "The node is already visited. "; + private final String RACK_VISITED = "The rack is already visited. "; + private final String ANY_VISITED = "ANY is already visited. "; + private final String NODE_FAILURE = "The node is visited again."; + private final String RACK_FAILURE = "The rack is visited again."; + private final String ANY_FAILURE = "ANY is visited again."; + private final String FIRST_CALL_FAILURE = "First call to visit failed."; + + public TestVisitedResourceRequestTracker() { + List rmNodes = + MockNodes.newNodes(1, 2, Resources.createResource(8192, 8)); + + FSSchedulerNode node1 = new FSSchedulerNode(rmNodes.get(0), false); + nodeTracker.addNode(node1); + node1Request = createRR(node1.getNodeName(), 1); + + FSSchedulerNode node2 = new FSSchedulerNode(rmNodes.get(1), false); + node2Request = createRR(node2.getNodeName(), 1); + nodeTracker.addNode(node2); + + anyRequest = createRR(ResourceRequest.ANY, 2); + rackRequest = createRR(node1.getRackName(), 2); + } + + private ResourceRequest createRR(String resourceName, int count) { + return ResourceRequest.newInstance( + Priority.UNDEFINED, resourceName, Resources.none(), count); + } + + @Test + public void testVisitAnyRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit ANY request first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(anyRequest)); + + // All other requests should return false + assertFalse(ANY_VISITED + RACK_FAILURE, tracker.visit(rackRequest)); + assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node1Request)); + assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node2Request)); + } + + @Test + public void testVisitRackRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit rack request first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(rackRequest)); + + // All other requests should return false + assertFalse(RACK_VISITED + ANY_FAILURE, tracker.visit(anyRequest)); + assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node1Request)); + assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node2Request)); + } + + @Test + public void testVisitNodeRequestFirst() { + VisitedResourceRequestTracker tracker = + new VisitedResourceRequestTracker(nodeTracker); + + // Visit node1 first + assertTrue(FIRST_CALL_FAILURE, tracker.visit(node1Request)); + + // Rack and ANY should return false + assertFalse(NODE_VISITED + ANY_FAILURE, tracker.visit(anyRequest)); + assertFalse(NODE_VISITED + RACK_FAILURE, tracker.visit(rackRequest)); + + // The other node should return true + assertTrue(NODE_VISITED + "Different node visit failed", + tracker.visit(node2Request)); + } +}