diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ca67fcb995f..99b4d92fde3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -651,6 +651,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. (Arun C Murthy via sseth) + MAPREDUCE-3752. Modified application limits to include queue max-capacities + besides the usual user limits. (Arun C Murthy via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java index 428246d485d..4f41bfde8a0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java @@ -111,4 +111,12 @@ public class Resources { public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) { return lhs.getMemory() >= rhs.getMemory(); } + + public static Resource min(Resource lhs, Resource rhs) { + return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs; + } + + public static Resource max(Resource lhs, Resource rhs) { + return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 2040505be67..0489ab287eb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -162,6 +162,13 @@ public class AppSchedulingInfo { asks.put(hostName, request); if (updatePendingResources) { + + // Similarly, deactivate application? + if (request.getNumContainers() <= 0) { + LOG.info("checking for deactivate... "); + checkForDeactivation(); + } + int lastRequestContainers = lastRequest != null ? lastRequest .getNumContainers() : 0; Resource lastRequestCapability = lastRequest != null ? lastRequest @@ -308,19 +315,24 @@ public class AppSchedulingInfo { // Do we have any outstanding requests? // If there is nothing, we need to deactivate this application if (numOffSwitchContainers == 0) { - boolean deactivate = true; - for (Priority priority : getPriorities()) { - ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); - if (request.getNumContainers() > 0) { - deactivate = false; - break; - } - } - if (deactivate) { - activeUsersManager.deactivateApplication(user, applicationId); - } + checkForDeactivation(); } } + + synchronized private void checkForDeactivation() { + boolean deactivate = true; + for (Priority priority : getPriorities()) { + ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); + if (request.getNumContainers() > 0) { + deactivate = false; + break; + } + } + if (deactivate) { + activeUsersManager.deactivateApplication(user, applicationId); + } + } + synchronized private void allocate(Container container) { // Update consumption and track allocations //TODO: fixme sharad diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 08ee09463a0..c107093ea9e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -751,15 +751,15 @@ public class LeafQueue implements CSQueue { continue; } - // Compute & set headroom - // Note: We set the headroom with the highest priority request - // as the target. + // Compute user-limit & set headroom + // Note: We compute both user-limit & headroom with the highest + // priority request as the target. // This works since we never assign lower priority requests // before all higher priority ones are serviced. Resource userLimit = - computeAndSetUserResourceLimit(application, clusterResource, - required); - + computeUserLimitAndSetHeadroom(application, clusterResource, + required); + // Check queue max-capacity limit if (!assignToQueue(clusterResource, required)) { return NULL_ASSIGNMENT; @@ -777,13 +777,13 @@ public class LeafQueue implements CSQueue { CSAssignment assignment = assignContainersOnNode(clusterResource, node, application, priority, null); - - Resource assigned = assignment.getResource(); - + // Did we schedule or reserve a container? + Resource assigned = assignment.getResource(); if (Resources.greaterThan(assigned, Resources.none())) { - // Book-keeping + // Book-keeping + // Note: Update headroom to account for current allocation too... allocateResource(clusterResource, application, assigned); // Reset scheduling opportunities @@ -854,20 +854,50 @@ public class LeafQueue implements CSQueue { } @Lock({LeafQueue.class, SchedulerApp.class}) - private Resource computeAndSetUserResourceLimit(SchedulerApp application, - Resource clusterResource, Resource required) { + private Resource computeUserLimitAndSetHeadroom( + SchedulerApp application, Resource clusterResource, Resource required) { + String user = application.getUser(); - Resource limit = computeUserLimit(application, clusterResource, required); + + /** + * Headroom is min((userLimit, queue-max-cap) - consumed) + */ + + Resource userLimit = // User limit + computeUserLimit(application, clusterResource, required); + + + Resource queueMaxCap = // Queue Max-Capacity + Resources.createResource( + roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory())) + ); + + Resource userConsumed = getUser(user).getConsumedResources(); Resource headroom = - Resources.subtract(limit, getUser(user).getConsumedResources()); + Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed); + + if (LOG.isDebugEnabled()) { + LOG.debug("Headroom calculation for user " + user + ": " + + " userLimit=" + userLimit + + " queueMaxCap=" + queueMaxCap + + " consumed=" + userConsumed + + " headroom=" + headroom); + } + application.setHeadroom(headroom); metrics.setAvailableResourcesToUser(user, headroom); - return limit; + + return userLimit; } private int roundUp(int memory) { - return divideAndCeil(memory, minimumAllocation.getMemory()) * - minimumAllocation.getMemory(); + int minMemory = minimumAllocation.getMemory(); + return divideAndCeil(memory, minMemory) * minMemory; + } + + private int roundDown(int memory) { + int minMemory = minimumAllocation.getMemory(); + return (memory / minMemory) * minMemory; } @Lock(NoLock.class) @@ -1288,10 +1318,17 @@ public class LeafQueue implements CSQueue { String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource); + Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); - LOG.info(getQueueName() + - " used=" + usedResources + " numContainers=" + numContainers + - " user=" + userName + " user-resources=" + user.getConsumedResources()); + + if (LOG.isDebugEnabled()) { + LOG.info(getQueueName() + + " user=" + userName + + " used=" + usedResources + " numContainers=" + numContainers + + " headroom = " + application.getHeadroom() + + " user-resources=" + user.getConsumedResources() + ); + } } synchronized void releaseResource(Resource clusterResource, @@ -1325,8 +1362,8 @@ public class LeafQueue implements CSQueue { // Update application properties for (SchedulerApp application : activeApplications) { synchronized (application) { - computeAndSetUserResourceLimit( - application, clusterResource, Resources.none()); + computeUserLimitAndSetHeadroom(application, clusterResource, + Resources.none()); } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 9db0288ad58..5ccf3ba1516 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -38,6 +38,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -67,6 +69,8 @@ import org.mockito.stubbing.Answer; public class TestLeafQueue { + private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -472,6 +476,115 @@ public class TestLeafQueue { assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); } + @Test + public void testHeadroomWithMaxCap() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + SchedulerApp app_0 = + new SchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_0, user_0, A); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + SchedulerApp app_1 = + new SchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_1, user_0, A); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + SchedulerApp app_2 = + new SchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), rmContext, null); + a.submitApplication(app_2, user_1, A); + + // Setup some nodes + String host_0 = "host_0"; + SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "host_1"; + SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority, + recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority, + recordFactory))); + + /** + * Start testing... + */ + + // Set user-limit + a.setUserLimit(50); + a.setUserLimitFactor(2); + + // Now, only user_0 should be active since he is the only one with + // outstanding requests + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + + // 1 container to user_0 + a.assignContainers(clusterResource, node_0); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G + + // Again one to user_0 since he hasn't exceeded user limit yet + a.assignContainers(clusterResource, node_0); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G + assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G + + // Submit requests for app_1 and set max-cap + a.setMaxCapacity(.1f); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority, + recordFactory))); + assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); + + // No more to user_0 since he is already over user-limit + // and no more containers to queue since it's already at max-cap + a.assignContainers(clusterResource, node_1); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_0.getHeadroom().getMemory()); + assertEquals(0*GB, app_1.getHeadroom().getMemory()); + + // Check headroom for app_2 + LOG.info("here"); + app_1.updateResourceRequests(Collections.singletonList( // unset + TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority, + recordFactory))); + assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); + a.assignContainers(clusterResource, node_1); + assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + } + @Test public void testSingleQueueWithMultipleUsers() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 6bddf87eeb2..f3092b0637e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -86,7 +86,7 @@ public class TestParentQueue { private SchedulerApp getMockApplication(int appId, String user) { SchedulerApp application = mock(SchedulerApp.class); doReturn(user).when(application).getUser(); - doReturn(null).when(application).getHeadroom(); + doReturn(Resources.createResource(0)).when(application).getHeadroom(); return application; }