diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java index 3f8ed5552a4..12aff02ca10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -63,9 +63,14 @@ public class ActivitiesLogger { SchedulerApplicationAttempt application, Priority priority, String diagnostic) { String type = "app"; - recordActivity(activitiesManager, node, application.getQueueName(), - application.getApplicationId().toString(), priority, - ActivityState.REJECTED, diagnostic, type); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic, type); + } finishSkippedAppAllocationRecording(activitiesManager, application.getApplicationId(), ActivityState.REJECTED, diagnostic); } @@ -203,8 +208,13 @@ public class ActivitiesLogger { public static void recordQueueActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentQueueName, String queueName, ActivityState state, String diagnostic) { - recordActivity(activitiesManager, node, parentQueueName, queueName, null, - state, diagnostic, null); + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + recordActivity(activitiesManager, node, parentQueueName, queueName, + null, state, diagnostic, null); + } } } @@ -266,13 +276,10 @@ public class ActivitiesLogger { private static void recordActivity(ActivitiesManager activitiesManager, SchedulerNode node, String parentName, String childName, Priority priority, ActivityState state, String diagnostic, String type) { - if (activitiesManager == null) { - return; - } - if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { - activitiesManager.addSchedulingActivityForNode(node.getNodeID(), - parentName, childName, priority != null ? priority.toString() : null, - state, diagnostic, type); - } + + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), parentName, + childName, priority != null ? priority.toString() : null, state, + diagnostic, type); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 013a5ac720f..2e502b77677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1026,6 +1026,8 @@ public class LeafQueue extends AbstractCSQueue { return CSAssignment.NULL_ASSIGNMENT; } + Map userLimits = new HashMap<>(); + boolean needAssignToQueueCheck = true; for (Iterator assignmentIterator = orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext(); ) { @@ -1035,24 +1037,50 @@ public class LeafQueue extends AbstractCSQueue { node.getNodeID(), SystemClock.getInstance().getTime(), application); // Check queue max-capacity limit - if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(), - currentResourceLimits, application.getCurrentReservation(), - schedulingMode)) { - ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( - activitiesManager, node, application, application.getPriority(), - ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); - ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, - getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, - ActivityDiagnosticConstant.EMPTY); - return CSAssignment.NULL_ASSIGNMENT; + Resource appReserved = application.getCurrentReservation(); + if (needAssignToQueueCheck) { + if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), + currentResourceLimits, appReserved, schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + return CSAssignment.NULL_ASSIGNMENT; + } + // If there was no reservation and canAssignToThisQueue returned + // true, there is no reason to check further. + if (!this.reservationsContinueLooking + || appReserved.equals(Resources.none())) { + needAssignToQueueCheck = false; + } } + CachedUserLimit cul = userLimits.get(application.getUser()); + Resource cachedUserLimit = null; + if (cul != null) { + cachedUserLimit = cul.userLimit; + } Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, ps.getPartition(), schedulingMode); - + clusterResource, ps.getPartition(), schedulingMode, cachedUserLimit); + if (cul == null) { + cul = new CachedUserLimit(userLimit); + userLimits.put(application.getUser(), cul); + } // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, ps.getPartition(), currentResourceLimits)) { + boolean userAssignable = true; + if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) { + userAssignable = false; + } else { + userAssignable = canAssignToUser(clusterResource, application.getUser(), + userLimit, application, node.getPartition(), currentResourceLimits); + if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) { + cul.canAssign = false; + cul.reservation = appReserved; + } + } + if (!userAssignable) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( @@ -1127,7 +1155,7 @@ public class LeafQueue extends AbstractCSQueue { // check user-limit Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p, - allocation.getSchedulingMode()); + allocation.getSchedulingMode(), null); // Deduct resources that we can release Resource usedResource = Resources.clone(getUser(username).getUsed(p)); @@ -1332,19 +1360,20 @@ public class LeafQueue extends AbstractCSQueue { @Lock({LeafQueue.class}) Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, Resource clusterResource, String nodePartition, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, Resource userLimit) { String user = application.getUser(); User queueUser = getUser(user); // Compute user limit respect requested labels, // TODO, need consider headroom respect labels also - Resource userLimit = - getResourceLimitForActiveUsers(application.getUser(), clusterResource, - nodePartition, schedulingMode); - + if (userLimit == null) { + userLimit = getResourceLimitForActiveUsers(application.getUser(), + clusterResource, nodePartition, schedulingMode); + } setQueueResourceLimitsInfo(clusterResource); Resource headroom = + metrics.getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit, nodePartition); @@ -1352,7 +1381,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" - + queueUser.getUsed() + " headroom=" + headroom + " partition=" + + queueUser.getUsed() + " partition=" + nodePartition); } @@ -1713,7 +1742,7 @@ public class LeafQueue extends AbstractCSQueue { .getSchedulableEntities()) { computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL, - SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); } } finally { writeLock.unlock(); @@ -2052,4 +2081,14 @@ public class LeafQueue extends AbstractCSQueue { public Set getAllUsers() { return this.getUsersManager().getUsers().keySet(); } + + static class CachedUserLimit { + final Resource userLimit; + boolean canAssign = true; + Resource reservation = Resources.none(); + + CachedUserLimit(Resource userLimit) { + this.userLimit = userLimit; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ba0f906c1a9..41a7ce81868 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -33,9 +33,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; @@ -50,6 +52,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -90,7 +93,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; import org.apache.hadoop.yarn.server.resourcemanager.Application; -import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -156,8 +158,12 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -3492,6 +3498,7 @@ public class TestCapacityScheduler { rm.stop(); } + @Test public void testHeadRoomCalculationWithDRC() throws Exception { // test with total cluster resource of 20GB memory and 20 vcores. @@ -4074,6 +4081,143 @@ public class TestCapacityScheduler { rm.stop(); } + @Test (timeout = 300000) + public void testUserLimitThroughput() throws Exception { + // Since this is more of a performance unit test, only run if + // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true) + Assume.assumeTrue(Boolean.valueOf( + System.getProperty("RunUserLimitThroughput"))); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f); + csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", + 100.0f); + csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + + // For now make user limit large so we can activate all applications + qb.setUserLimitFactor((float)100.0); + qb.setupConfigurableCapacities(); + + SchedulerEvent addAppEvent; + SchedulerEvent addAttemptEvent; + Container container = mock(Container.class); + ApplicationSubmissionContext submissionContext = + mock(ApplicationSubmissionContext.class); + + final int appCount = 100; + ApplicationId[] appids = new ApplicationId[appCount]; + RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount]; + ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount]; + RMAppImpl[] apps = new RMAppImpl[appCount]; + RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount]; + for (int i=0; i loggers=LogManager.getCurrentLoggers(); + loggers.hasMoreElements(); ) { + Logger logger = (Logger) loggers.nextElement(); + logger.setLevel(Level.WARN); + } + final int topn = 20; + final int iterations = 2000000; + final int printInterval = 20000; + final float numerator = 1000.0f * printInterval; + PriorityQueue queue = new PriorityQueue<>(topn, + Collections.reverseOrder()); + + long n = Time.monotonicNow(); + long timespent = 0; + for (int i = 0; i < iterations; i+=2) { + if (i > 0 && i % printInterval == 0){ + long ts = (Time.monotonicNow() - n); + if (queue.size() < topn) { + queue.offer(ts); + } else { + Long last = queue.peek(); + if (last > ts) { + queue.poll(); + queue.offer(ts); + } + } + System.out.println(i + " " + (numerator / ts)); + n= Time.monotonicNow(); + } + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + } + timespent=0; + int entries = queue.size(); + while(queue.size() > 0){ + long l = queue.poll(); + timespent += l; + } + System.out.println("Avg of fastest " + entries + ": " + + numerator / (timespent / entries)); + rm.stop(); + } + @Test public void testCSQueueBlocked() throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 4417132a774..2864d7fd7f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1146,7 +1146,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //maxqueue 16G, userlimit 13G, - 4G used = 9G assertEquals(9*GB,app_0.getHeadroom().getMemorySize()); @@ -1169,7 +1169,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(8*GB, qb.getUsedResources().getMemorySize()); assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize()); @@ -1219,7 +1219,7 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); assertEquals(4*GB, qb.getUsedResources().getMemorySize()); //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) assertEquals(5*GB, app_3.getHeadroom().getMemorySize()); @@ -1240,9 +1240,9 @@ public class TestLeafQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), qb, nodes, apps); qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, - "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null); //app3 is user1, active from last test case