From dd471425652f670786bafa03537526c19180d5ee Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 23 Nov 2017 20:18:22 -0800 Subject: [PATCH] YARN-7496. CS Intra-queue preemption user-limit calculations are not in line with LeafQueue user-limit calculations. (Eric Payne via wangda) Change-Id: I4dada78a227408a1f2d9bc18099041aad81d67d7 --- .../scheduler/capacity/LeafQueue.java | 37 +++---- ...tyPreemptionPolicyIntraQueueUserLimit.java | 42 ++++++++ .../scheduler/capacity/TestLeafQueue.java | 97 +++++++++++++++++++ 3 files changed, 152 insertions(+), 24 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 40b7e0e1a43..3915b1e782d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -142,7 +142,6 @@ public class LeafQueue extends AbstractCSQueue { private Set activeUsersSet = Collections.newSetFromMap(new ConcurrentHashMap()); private float activeUsersTimesWeights = 0.0f; - private float allUsersTimesWeights = 0.0f; @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, @@ -307,7 +306,6 @@ public class LeafQueue extends AbstractCSQueue { ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey())); } activeUsersTimesWeights = sumActiveUsersTimesWeights(); - allUsersTimesWeights = sumAllUsersTimesWeights(); } /** @@ -450,7 +448,6 @@ public class LeafQueue extends AbstractCSQueue { user = new User(userName); users.put(userName, user); user.setWeight(getUserWeightFromQueue(userName)); - allUsersTimesWeights = sumAllUsersTimesWeights(); } return user; } @@ -871,7 +868,6 @@ public class LeafQueue extends AbstractCSQueue { user.finishApplication(wasActive); if (user.getTotalApplications() == 0) { users.remove(application.getUser()); - allUsersTimesWeights = sumAllUsersTimesWeights(); } // Check if we can activate more applications @@ -1298,18 +1294,20 @@ public class LeafQueue extends AbstractCSQueue { // Also, the queue's configured capacity should be higher than // queue-hard-limit * ulMin - float usersSummedByWeight; - if (forActive) { - if (activeUsersManager.getActiveUsersChanged()) { - activeUsersSet = activeUsersManager.getActiveUsersSet(); - activeUsersTimesWeights = sumActiveUsersTimesWeights(); - activeUsersManager.clearActiveUsersChanged(); - } - usersSummedByWeight = activeUsersTimesWeights; - } else { - usersSummedByWeight = allUsersTimesWeights; + if (activeUsersManager.getActiveUsersChanged()) { + activeUsersSet = activeUsersManager.getActiveUsersSet(); + activeUsersTimesWeights = sumActiveUsersTimesWeights(); + activeUsersManager.clearActiveUsersChanged(); } - + float usersSummedByWeight = activeUsersTimesWeights; + + // Align the preemption algorithm with the assignment algorithm. + // If calculating for preemption and the user is not active, calculate the + // limit as if the user will be preempted (since that will make it active). + if (!forActive && !activeUsersSet.contains(userName)) { + usersSummedByWeight = activeUsersTimesWeights + user.getWeight(); + } + // User limit resource is determined by: // max(currentCapacity / #activeUsers, currentCapacity * // user-limit-percentage%) @@ -1394,15 +1392,6 @@ public class LeafQueue extends AbstractCSQueue { } return count; } - - synchronized float sumAllUsersTimesWeights() { - float count = 0.0f; - for (String userName : users.keySet()) { - User user = getUser(userName); - count += user.getWeight(); - } - return count; - } @Private protected synchronized boolean canAssignToUser(Resource clusterResource, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java index 0440db321b1..006c3f70dd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java @@ -931,4 +931,46 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( getAppAttemptId(1)))); } + + @Test + public void testSimpleIUserLimitntraQueuePreemptionNoOverPreemption() + throws IOException { + conf.setFloat(CapacitySchedulerConfiguration. + INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, + (float) 0.5); + + String labelsConfig = "=100,true;"; + String nodesConfig = // n1 has no label + "n1= res=100"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 1 0]);" + // root + "-a(=[100 100 100 1 0])"; // a + + // When preempting within a queue, it should preempt no more than what the + // LeafQueue will then re-assign. In this use case, containers will be + // preempted from app1, which will cause app1 and app2 to be active. + // LeafQueue will calculate MULP to be 50% because + // (100 resources / 2 active users) = 50. We expect 14 containers to be + // preempted. + // queueName\t(priority,resource,host,expression,#repeat,reserved,pending) + // \tMULP + String appsConfig = + "a\t" // app1 in a + + "(1,1,n1,,65,false,0,user1)\t50;" + + "a\t" // app2 in a + + "(1,1,n1,,35,false,0,user2)\t50;" + + "a\t" // app3 in a + + "(1,1,n1,,0,false,35,user3)\t50" + ; + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // app2 is right at its user limit and app1 needs one resource. Should + // preempt 1 container. + verify(mDisp, times(14)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index e4fe03d9cd6..23fb29d9e19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3503,4 +3503,101 @@ public class TestLeafQueue { assertEquals(1*GB, app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); } + + @Test + public void testGetResourceLimitForAllUsers() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + // Set minimum-user-limit-percent for queue "a" so 3 users can be active. + csConf.setUserLimit(a.getQueuePath(), 33); + // Make sure a single user can consume the entire cluster. + csConf.setUserLimitFactor(a.getQueuePath(), 15); + csConf.setMaximumCapacity(a.getQueuePath(), 100); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(100 * GB, 192)); + a.reinitialize(a, csContext.getClusterResource()); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + final String user_2 = "user_2"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_1); // different user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_2, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_2, user_2); // different user + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 100*GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (100*GB), numNodes * 128); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // user_0 consumes 65% of the queue + Priority priority = TestUtils.createMockPriority(1); + for (int i=0; i < 65; i++) { + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + assertEquals(65*GB, app_0.getCurrentConsumption().getMemorySize()); + + // When the minimum user limit percent is set to 33%, the capacity scheduler + // will try to assign 35% of resources to each user. This is because the + // capacity scheduler leaves a slight buffer for each user. + for (int i=0; i < 35; i++) { + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + } + assertEquals(35*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(0, a.getActiveUsersManager().getNumActiveUsers()); + + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 35, true, + priority, recordFactory))); + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize()); + assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); + + // With one active user requesting resources (user_2), one user exactly at + // the user limit guarantee (user_1) and one user over its user limit + // (uesr_0), preemption should calculate the user limit to be 50% of the + // resources. Since the capacity scheduler will leave a buffer of 1 + // container, 51GB should be the amount of resources calculated for + // preemption. + Resource ulForallUsers = a.getResourceLimitForAllUsers(user_2, + clusterResource, "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(51*GB, ulForallUsers.getMemorySize()); + } }