From e89941fdbb3b382eeb487d32e5194909610ac334 Mon Sep 17 00:00:00 2001 From: Yufei Gu Date: Tue, 4 Dec 2018 10:08:45 -0800 Subject: [PATCH] YARN-9041. Performance Optimization of method FSPreemptionThread#identifyContainersToPreempt. Contributed by Wanqiang Ji. --- .../scheduler/fair/FSPreemptionThread.java | 55 +++++-- .../fair/TestFairSchedulerPreemption.java | 155 ++++++++++++------ 2 files changed, 140 insertions(+), 70 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index 6ed90f816a5..4c830523cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -97,12 +97,7 @@ public void run() { * Mechanics: * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of * starvation. - * 2. For each {@link ResourceRequest}, iterate through matching - * nodes and identify containers to preempt all on one node, also - * optimizing for least number of AM container preemptions. Only nodes - * that match the locality level specified in the {@link ResourceRequest} - * are considered. However, if this would lead to AM preemption, and locality - * relaxation is allowed, then the search space is expanded to all nodes. + * 2. For each {@link ResourceRequest}, get the best preemptable containers. * * @param starvedApp starved application for which we are identifying * preemption targets @@ -118,18 +113,7 @@ private List identifyContainersToPreempt( .getNodesByResourceName(rr.getResourceName()); for (int i = 0; i < rr.getNumContainers(); i++) { PreemptableContainers bestContainers = - identifyContainersToPreemptForOneContainer(potentialNodes, rr); - - // Don't preempt AM containers just to satisfy local requests if relax - // locality is enabled. - if (bestContainers != null - && bestContainers.numAMContainers > 0 - && !ResourceRequest.isAnyLocation(rr.getResourceName()) - && rr.getRelaxLocality()) { - bestContainers = identifyContainersToPreemptForOneContainer( - scheduler.getNodeTracker().getAllNodes(), rr); - } - + getBestPreemptableContainers(rr, potentialNodes); if (bestContainers != null) { List containers = bestContainers.getAllContainers(); if (containers.size() > 0) { @@ -240,6 +224,41 @@ private void preemptContainers(List containers) { new PreemptContainersTask(containers), warnTimeBeforeKill); } + /** + * Iterate through matching nodes and identify containers to preempt all on + * one node, also optimizing for least number of AM container preemptions. + * Only nodes that match the locality level specified in the + * {@link ResourceRequest} are considered. However, if this would lead to + * AM preemption, and locality relaxation is allowed, then the search space + * is expanded to the remaining nodes. + * + * @param rr resource request + * @param potentialNodes list of {@link FSSchedulerNode} + * @return the list of best preemptable containers for the resource request + */ + private PreemptableContainers getBestPreemptableContainers(ResourceRequest rr, + List potentialNodes) { + PreemptableContainers bestContainers = + identifyContainersToPreemptForOneContainer(potentialNodes, rr); + + if (rr.getRelaxLocality() + && !ResourceRequest.isAnyLocation(rr.getResourceName()) + && bestContainers != null + && bestContainers.numAMContainers > 0) { + List remainingNodes = + scheduler.getNodeTracker().getAllNodes(); + remainingNodes.removeAll(potentialNodes); + PreemptableContainers spareContainers = + identifyContainersToPreemptForOneContainer(remainingNodes, rr); + if (spareContainers != null && spareContainers.numAMContainers + < bestContainers.numAMContainers) { + bestContainers = spareContainers; + } + } + + return bestContainers; + } + private class PreemptContainersTask extends TimerTask { private final List containers; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..67cb6004f64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -387,13 +387,6 @@ private void setNumAMContainersPerNode(int numAMContainersPerNode) { } } - private void setAllAMContainersOnNode(NodeId nodeId) { - SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); - for (RMContainer container: node.getCopiedListOfRunningContainers()) { - ((RMContainerImpl) container).setAMContainer(true); - } - } - @Test public void testPreemptionSelectNonAMContainer() throws Exception { takeAllResources("root.preemptable.child-1"); @@ -412,51 +405,6 @@ public void testPreemptionSelectNonAMContainer() throws Exception { + "nodes.", !host0.equals(host1)); } - @Test - public void testRelaxLocalityToNotPreemptAM() throws Exception { - takeAllResources("root.preemptable.child-1"); - RMNode node1 = rmNodes.get(0); - setAllAMContainersOnNode(node1.getNodeID()); - SchedulerNode node = scheduler.getNodeTracker().getNode(node1.getNodeID()); - ApplicationAttemptId greedyAppAttemptId = - node.getCopiedListOfRunningContainers().get(0) - .getApplicationAttemptId(); - - // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be - // satisfied. This forces the RR that we consider for preemption to be the - // NODE_LOCAL one. - ResourceRequest nodeRequest = - createResourceRequest(GB, node1.getHostName(), 1, 4, true); - ResourceRequest rackRequest = - createResourceRequest(GB * 10, node1.getRackName(), 1, 1, true); - ResourceRequest anyRequest = - createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); - - List resourceRequests = - Arrays.asList(nodeRequest, rackRequest, anyRequest); - - ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( - "root.preemptable.child-2", "default", resourceRequests); - starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); - - // Move clock enough to identify starvation - clock.tickSec(1); - scheduler.update(); - - // Make sure 4 containers were preempted from the greedy app, but also that - // none were preempted on our all-AM node, even though the NODE_LOCAL RR - // asked for resources on it. - - // TODO (YARN-7655) The starved app should be allocated 4 containers. - // It should be possible to modify the RRs such that this is true - // after YARN-7903. - verifyPreemption(0, 4); - for (RMContainer container : node.getCopiedListOfRunningContainers()) { - assert (container.isAMContainer()); - assert (container.getApplicationAttemptId().equals(greedyAppAttemptId)); - } - } - @Test public void testAppNotPreemptedBelowFairShare() throws Exception { takeAllResources("root.preemptable.child-1"); @@ -492,4 +440,107 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() preemptHalfResources("root.preemptable.child-2"); verifyPreemption(1, 2); } + + /* It tests the case that there is less-AM-container solution in the + * remaining nodes. + */ + @Test + public void testRelaxLocalityPreemptionWithLessAMInRemainingNodes() + throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setAllAMContainersOnNode(node1.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node1.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB, 4); + verifyRelaxLocalityPreemption(node1.getNodeID(), greedyAppAttemptId, 4); + } + + /* It tests the case that there is no less-AM-container solution in the + * remaining nodes. + */ + @Test + public void testRelaxLocalityPreemptionWithNoLessAMInRemainingNodes() + throws Exception { + takeAllResources("root.preemptable.child-1"); + RMNode node1 = rmNodes.get(0); + setNumAMContainersOnNode(3, node1.getNodeID()); + RMNode node2 = rmNodes.get(1); + setAllAMContainersOnNode(node2.getNodeID()); + ApplicationAttemptId greedyAppAttemptId = + getGreedyAppAttemptIdOnNode(node2.getNodeID()); + updateRelaxLocalityRequestSchedule(node1, GB * 2, 1); + verifyRelaxLocalityPreemption(node2.getNodeID(), greedyAppAttemptId, 6); + } + + private void setAllAMContainersOnNode(NodeId nodeId) { + setNumAMContainersOnNode(Integer.MAX_VALUE, nodeId); + } + + private void setNumAMContainersOnNode(int num, NodeId nodeId) { + int count = 0; + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + for (RMContainer container: node.getCopiedListOfRunningContainers()) { + count++; + if (count <= num) { + ((RMContainerImpl) container).setAMContainer(true); + } else { + break; + } + } + } + + private ApplicationAttemptId getGreedyAppAttemptIdOnNode(NodeId nodeId) { + SchedulerNode node = scheduler.getNodeTracker().getNode(nodeId); + return node.getCopiedListOfRunningContainers().get(0) + .getApplicationAttemptId(); + } + + /* + * Send the resource requests allowed relax locality to scheduler. The + * params node/nodeMemory/numNodeContainers used for NODE_LOCAL request. + */ + private void updateRelaxLocalityRequestSchedule(RMNode node, int nodeMemory, + int numNodeContainers) { + // Make the RACK_LOCAL and OFF_SWITCH requests big enough that they can't be + // satisfied. This forces the RR that we consider for preemption to be the + // NODE_LOCAL one. + ResourceRequest nodeRequest = createResourceRequest(nodeMemory, + node.getHostName(), 1, numNodeContainers, true); + ResourceRequest rackRequest = + createResourceRequest(GB * 10, node.getRackName(), 1, 1, true); + ResourceRequest anyRequest = + createResourceRequest(GB * 10, ResourceRequest.ANY, 1, 1, true); + + List resourceRequests = + Arrays.asList(nodeRequest, rackRequest, anyRequest); + + ApplicationAttemptId starvedAppAttemptId = createSchedulingRequest( + "root.preemptable.child-2", "default", resourceRequests); + starvingApp = scheduler.getSchedulerApp(starvedAppAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + } + + private void verifyRelaxLocalityPreemption(NodeId notBePreemptedNodeId, + ApplicationAttemptId greedyAttemptId, int numGreedyAppContainers) + throws Exception { + // Make sure 4 containers were preempted from the greedy app, but also that + // none were preempted on our all-AM node, even though the NODE_LOCAL RR + // asked for resources on it. + + // TODO (YARN-7655) The starved app should be allocated 4 containers. + // It should be possible to modify the RRs such that this is true + // after YARN-7903. + verifyPreemption(0, numGreedyAppContainers); + SchedulerNode node = scheduler.getNodeTracker() + .getNode(notBePreemptedNodeId); + for (RMContainer container : node.getCopiedListOfRunningContainers()) { + assert(container.isAMContainer()); + assert(container.getApplicationAttemptId().equals(greedyAttemptId)); + } + } + }