From a39617df63e758629123da50d0690f3c13657eca Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Wed, 19 Jul 2017 15:29:45 -0700 Subject: [PATCH] YARN-6775. CapacityScheduler: Improvements to assignContainers, avoid unnecessary canAssignToUser/Queue calls. (Nathan Roberts via wangda) Change-Id: I5951f0997547de7d2e4a30b4ad87ab0a59b3066a --- .../scheduler/capacity/LeafQueue.java | 93 ++++++++--- .../capacity/TestCapacityScheduler.java | 145 ++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 10 +- .../scheduler/capacity/TestReservations.java | 6 +- 4 files changed, 221 insertions(+), 33 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 7258aee93d2..bd3bdff1439 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -956,25 +956,56 @@ public class LeafQueue extends AbstractCSQueue { return CSAssignment.NULL_ASSIGNMENT; } + Map userLimits = new HashMap<>(); + boolean needAssignToQueueCheck = true; for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator .hasNext(); ) { FiCaSchedulerApp application = assignmentIterator.next(); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - return CSAssignment.NULL_ASSIGNMENT; + Resource appReserved = application.getCurrentReservation(); + if (needAssignToQueueCheck) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, appReserved, schedulingMode)) { + return CSAssignment.NULL_ASSIGNMENT; + } + // If there was no reservation and canAssignToThisQueue returned + // true, there is no reason to check further. + if (!this.reservationsContinueLooking + || appReserved.equals(Resources.none()) || !node.getPartition() + .equals(CommonNodeLabelsManager.NO_LABEL)) { + needAssignToQueueCheck = false; + } } + CachedUserLimit cul = userLimits.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - node.getPartition(), schedulingMode); + node.getPartition(), schedulingMode, cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimits.put(application.getUser(), cul); + } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, node.getPartition(), currentResourceLimits)) { + boolean userAssignable = true; + if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { + userAssignable = false; + } else { + userAssignable = + canAssignToUser(clusterResource, application.getUser(), userLimit, + appReserved, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { + cul.canAssign = false; + cul.reservation = appReserved; + } + } + if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); continue; @@ -1113,19 +1144,21 @@ public class LeafQueue extends AbstractCSQueue { @Lock({LeafQueue.class, FiCaSchedulerApp.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, Resource userLimit) { String user = application.getUser(); User queueUser = getUser(user); // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = + if (userLimit == null) { + userLimit = computeUserLimit(application.getUser(), clusterResource, queueUser, nodePartition, schedulingMode, true); - + } setQueueResourceLimitsInfo(clusterResource); Resource headroom = + metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit, nodePartition); @@ -1133,8 +1166,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + - " consumed=" + queueUser.getUsed() + - " headroom=" + headroom); + " consumed=" + queueUser.getUsed()); } CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( @@ -1289,36 +1321,37 @@ public class LeafQueue extends AbstractCSQueue { @Private protected synchronized boolean canAssignToUser(Resource clusterResource, - String userName, Resource limit, FiCaSchedulerApp application, + String userName, Resource limit, Resource rsrv, String nodePartition, ResourceLimits currentResourceLimits) { User user = getUser(userName); - + Resource used = user.getUsed(nodePartition); currentResourceLimits.setAmountNeededUnreserve(Resources.none()); // Note: We aren't considering the current request since there is a fixed // overhead of the AM, but it's a > check, not a >= check, so... if (Resources .greaterThan(resourceCalculator, clusterResource, - user.getUsed(nodePartition), + used, limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && - nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { + if (this.reservationsContinueLooking && !rsrv.equals(Resources.none()) + && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { + if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + Resources.subtract(used, + rsrv), limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit based on reservations - " + " consumed: " - + user.getUsed() + " reserved: " - + application.getCurrentReservation() + " limit: " + limit); + + used + " reserved: " + + rsrv + " limit: " + limit); } Resource amountNeededToUnreserve = - Resources.subtract(user.getUsed(nodePartition), limit); + Resources.subtract(used, limit); // we can only acquire a new container if we unreserve first to // respect user-limit currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve); @@ -1328,7 +1361,7 @@ public class LeafQueue extends AbstractCSQueue { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() + " will exceed limit - " + " consumed: " - + user.getUsed(nodePartition) + " limit: " + limit); + + used + " limit: " + limit); } return false; } @@ -1623,7 +1656,7 @@ public class LeafQueue extends AbstractCSQueue { synchronized (application) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); } } } @@ -1733,7 +1766,7 @@ public class LeafQueue extends AbstractCSQueue { public ResourceUsage getResourceUsage() { return userResourceUsage; } - + public synchronized float resetAndUpdateUsageRatio( ResourceCalculator resourceCalculator, Resource resource, String nodePartition) { @@ -2109,6 +2142,16 @@ public class LeafQueue extends AbstractCSQueue { } } + static class CachedUserLimit { + final Resource userLimit; + boolean canAssign = true; + Resource reservation = Resources.none(); + + CachedUserLimit(Resource userLimit) { + this.userLimit = userLimit; + } + } + /** * Get all valid users in this queue. * @return user list diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 15fe2b4ad6b..a663bdb9775 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -145,8 +146,12 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -155,6 +160,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import java.util.Enumeration; +import java.util.PriorityQueue; public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -3023,6 +3030,7 @@ public class TestCapacityScheduler { rm.stop(); } + @Test public void testHeadRoomCalculationWithDRC() throws Exception { // test with total cluster resource of 20GB memory and 20 vcores. @@ -3570,6 +3578,143 @@ public class TestCapacityScheduler { rm.stop(); } + @Test (timeout = 300000) + public void testUserLimitThroughput() throws Exception { + // Since this is more of a performance unit test, only run if + // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) + Assume.assumeTrue(Boolean.valueOf( + System.getProperty("RunUserLimitThroughput"))); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + + SchedulerEvent addAppEvent; + SchedulerEvent addAttemptEvent; + Container container = mock(Container.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + + final int appCount = 100; + ApplicationId[] appids = new ApplicationId[appCount]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; + RMAppImpl[] apps = new RMAppImpl[appCount]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; + for (int i=0; i loggers=LogManager.getCurrentLoggers(); + loggers.hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + final int topn = 20; + final int iterations = 2000000; + final int printInterval = 20000; + final float numerator = 1000.0f * printInterval; + PriorityQueue queue = new PriorityQueue<>(topn, + Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < iterations; i+=2) { + if (i > 0 && i % printInterval == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < topn) { + queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } + } + System.out.println(i + " " + (numerator / ts)); + n= Time.monotonicNow(); + } + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + } + timespent=0; + int entries = queue.size(); + while(queue.size() > 0){ + long l = queue.poll(); + timespent += l; + } + System.out.println("Avg of fastest " + entries + ": " + + numerator / (timespent / entries)); + rm.stop(); + } + @Test public void testCSQueueBlocked() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0eccdfc7747..0d0d78befa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -932,7 +932,7 @@ public class TestLeafQueue { qb.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //maxqueue 16G, userlimit 13G, - 4G used = 9G assertEquals(9*GB,app_0.getHeadroom().getMemorySize()); @@ -951,7 +951,7 @@ public class TestLeafQueue { qb.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(8*GB, qb.getUsedResources().getMemorySize()); assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize()); @@ -995,7 +995,7 @@ public class TestLeafQueue { qb.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(4*GB, qb.getUsedResources().getMemorySize()); //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) assertEquals(5*GB, app_3.getHeadroom().getMemorySize()); @@ -1013,9 +1013,9 @@ public class TestLeafQueue { qb.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //app3 is user1, active from last test case diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index d10aa814086..4d6871d4249 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -1207,7 +1207,7 @@ public class TestReservations { // not over the limit Resource limit = Resources.createResource(14 * GB, 0); ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); - boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits); + boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(), "", userResourceLimits); assertTrue(res); assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); @@ -1215,7 +1215,7 @@ public class TestReservations { // set limit so it subtracts reservations and it can continue limit = Resources.createResource(12 * GB, 0); userResourceLimits = new ResourceLimits(clusterResource); - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, + res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(), "", userResourceLimits); assertTrue(res); // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit @@ -1228,7 +1228,7 @@ public class TestReservations { userResourceLimits = new ResourceLimits(clusterResource); // should now return false since feature off - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(), "", userResourceLimits); assertFalse(res); assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); }