diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index c05bff9a164..c32565f63c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -99,7 +99,10 @@ class FSPreemptionThread extends Thread { * starvation. * 2. For each {@link ResourceRequest}, iterate through matching * nodes and identify containers to preempt all on one node, also - * optimizing for least number of AM container preemptions. + * optimizing for least number of AM container preemptions. Only nodes + * that match the locality level specified in the {@link ResourceRequest} + * are considered. However, if this would lead to AM preemption, and locality + * relaxation is allowed, then the search space is expanded to all nodes. * * @param starvedApp starved application for which we are identifying * preemption targets @@ -111,27 +114,21 @@ class FSPreemptionThread extends Thread { // Iterate through enough RRs to address app's starvation for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) { + List potentialNodes = scheduler.getNodeTracker() + .getNodesByResourceName(rr.getResourceName()); for (int i = 0; i < rr.getNumContainers(); i++) { - PreemptableContainers bestContainers = null; - List potentialNodes = scheduler.getNodeTracker() - .getNodesByResourceName(rr.getResourceName()); - int maxAMContainers = Integer.MAX_VALUE; + PreemptableContainers bestContainers = + identifyContainersToPreemptForOneContainer(potentialNodes, rr); - for (FSSchedulerNode node : potentialNodes) { - PreemptableContainers preemptableContainers = - identifyContainersToPreemptOnNode( - rr.getCapability(), node, maxAMContainers); - - if (preemptableContainers != null) { - // This set is better than any previously identified set. - bestContainers = preemptableContainers; - maxAMContainers = bestContainers.numAMContainers; - - if (maxAMContainers == 0) { - break; - } - } - } // End of iteration through nodes for one RR + // Don't preempt AM containers just to satisfy local requests if relax + // locality is enabled. + if (bestContainers != null + && bestContainers.numAMContainers > 0 + && !ResourceRequest.isAnyLocation(rr.getResourceName()) + && rr.getRelaxLocality()) { + bestContainers = identifyContainersToPreemptForOneContainer( + scheduler.getNodeTracker().getAllNodes(), rr); + } if (bestContainers != null) { List containers = bestContainers.getAllContainers(); @@ -154,6 +151,29 @@ class FSPreemptionThread extends Thread { return containersToPreempt; } + private PreemptableContainers identifyContainersToPreemptForOneContainer( + List potentialNodes, ResourceRequest rr) { + PreemptableContainers bestContainers = null; + int maxAMContainers = Integer.MAX_VALUE; + + for (FSSchedulerNode node : potentialNodes) { + PreemptableContainers preemptableContainers = + identifyContainersToPreemptOnNode( + rr.getCapability(), node, maxAMContainers); + + if (preemptableContainers != null) { + // This set is better than any previously identified set. + bestContainers = preemptableContainers; + maxAMContainers = bestContainers.numAMContainers; + + if (maxAMContainers == 0) { + break; + } + } + } + return bestContainers; + } + /** * Identify containers to preempt on a given node. Try to find a list with * least AM containers to avoid preempting AM containers. This method returns diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index ac5d9fe7afa..da6428a8b5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -18,8 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; @@ -384,6 +387,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { } } + private void setAllAMContainersOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + ((RMContainerImpl) container).setAMContainer(true); + } + } + @Test public void testPreemptionSelectNonAMContainer() throws Exception { takeAllResources("root.preemptable.child-1"); @@ -402,6 +412,51 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { + "nodes.", !host0.equals(host1)); } + @Test + public void testRelaxLocalityToNotPreemptAM() throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setAllAMContainersOnNode(node1.getNodeID()); + SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + node.getCopiedListOfRunningContainers().get(0) + .getApplicationAttemptId(); + + // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be + // satisfied. This forces the RR that we consider for preemption to be the + // NODE_LOCAL one. + ResourceRequest nodeRequest = + createResourceRequest(GB, node1.getHostName(), 1, 4, true); + ResourceRequest rackRequest = + createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); + ResourceRequest anyRequest = + createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + + List resourceRequests = + Arrays.asList(nodeRequest, rackRequest, anyRequest); + + ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( + "root.preemptable.child-2", "default", resourceRequests); + starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + + // Make sure 4 containers were preempted from the greedy app, but also that + // none were preempted on our all-AM node, even though the NODE_LOCAL RR + // asked for resources on it. + + // TODO (YARN-7655) The starved app should be allocated 4 containers. + // It should be possible to modify the RRs such that this is true + // after YARN-7903. + verifyPreemption(0, 4); + for (RMContainer container : node.getCopiedListOfRunningContainers()) { + assert (container.isAMContainer()); + assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); + } + } + @Test public void testAppNotPreemptedBelowFairShare() throws Exception { takeAllResources("root.preemptable.child-1");