diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9a8fd932b60..ae2375908de 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1076,6 +1076,9 @@ Release 2.7.3 - UNRELEASED YARN-4374. RM capacity scheduler UI rounds user limit factor (Chang Li via jlowe) + YARN-3769. Consider user limit when calculating total pending resource for + preemption policy in Capacity Scheduler. (Eric Payne via wangda) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 77df05989d7..5df2be8402b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -872,7 +872,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic if (curQueue instanceof LeafQueue) { LeafQueue l = (LeafQueue) curQueue; Resource pending = - l.getQueueResourceUsage().getPending(partitionToLookAt); + l.getTotalPendingResourcesConsideringUserLimit( + partitionResource, partitionToLookAt); ret = new TempQueuePerPartition(queueName, current, pending, guaranteed, maxCapacity, preemptionDisabled, partitionToLookAt); if (preemptionDisabled) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ef2bf7e4333..e5ac07280b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -64,7 +64,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.Resources; @@ -1515,14 +1514,35 @@ public class LeafQueue extends AbstractCSQueue { return orderingPolicy.getSchedulableEntities(); } - // return a single Resource capturing the overal amount of pending resources - public synchronized Resource getTotalResourcePending() { - Resource ret = BuilderUtils.newResource(0, 0); - for (FiCaSchedulerApp f : - orderingPolicy.getSchedulableEntities()) { - Resources.addTo(ret, f.getTotalPendingRequests()); + // Consider the headroom for each user in the queue. + // Total pending for the queue = + // sum(for each user(min((user's headroom), sum(user's pending requests)))) + // NOTE: Used for calculating pedning resources in the preemption monitor. + public synchronized Resource getTotalPendingResourcesConsideringUserLimit( + Resource resources, String partition) { + Map userNameToHeadroom = new HashMap(); + Resource pendingConsideringUserLimit = Resource.newInstance(0, 0); + for (FiCaSchedulerApp app : getApplications()) { + String userName = app.getUser(); + if (!userNameToHeadroom.containsKey(userName)) { + User user = getUser(userName); + Resource headroom = Resources.subtract( + computeUserLimit(app, resources, user, partition, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + user.getUsed(partition)); + // Make sure headroom is not negative. + headroom = Resources.componentwiseMax(headroom, Resources.none()); + userNameToHeadroom.put(userName, headroom); + } + Resource minpendingConsideringUserLimit = + Resources.componentwiseMin(userNameToHeadroom.get(userName), + app.getAppAttemptResourceUsage().getPending(partition)); + Resources.addTo(pendingConsideringUserLimit, + minpendingConsideringUserLimit); + Resources.subtractFrom( + userNameToHeadroom.get(userName), minpendingConsideringUserLimit); } - return ret; + return pendingConsideringUserLimit; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 8d9f48a8354..7a3ce568b1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -1150,7 +1150,8 @@ public class TestProportionalCapacityPreemptionPolicy { ResourceCalculator rc = mCS.getResourceCalculator(); List appAttemptIdList = new ArrayList(); - when(lq.getTotalResourcePending()).thenReturn(pending[i]); + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class))).thenReturn(pending[i]); // need to set pending resource in resource usage as well ResourceUsage ru = new ResourceUsage(); ru.setPending(pending[i]); @@ -1292,7 +1293,9 @@ public class TestProportionalCapacityPreemptionPolicy { } } else { System.out.println(indent + nq.getQueueName() - + " pen:" + ((LeafQueue) nq).getTotalResourcePending() + + " pen:" + + ((LeafQueue) nq).getTotalPendingResourcesConsideringUserLimit( + isA(Resource.class), isA(String.class)) + " cur:" + nq.getAbsoluteUsedCapacity() + " guar:" + nq.getAbsoluteCapacity() ); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index bbcb625ab56..512f37c292c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -1219,6 +1220,11 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { qc.setAbsoluteMaximumCapacity(partitionName, absMax); qc.setAbsoluteUsedCapacity(partitionName, absUsed); ru.setPending(partitionName, pending); + if (!isParent(queueExprArray, idx)) { + LeafQueue lq = (LeafQueue) queue; + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class))).thenReturn(pending); + } ru.setUsed(partitionName, parseResourceFromString(values[2].trim())); LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index d3365de67a6..479e25a880e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -89,6 +90,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import org.mortbay.log.Log; public class TestLeafQueue { private final RecordFactory recordFactory = @@ -2705,6 +2707,363 @@ public class TestLeafQueue { assertEquals(1, app_1.getLiveContainers().size()); } + @Test + public void testGetTotalPendingResourcesConsideringUserLimitOneUser() + throws Exception { + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + // Allow queue 'e' to use 100% of cluster resources (max capacity). + e.setMaxCapacity(1.0f); + // When used queue resources goes above capacity (in this case, 1%), user + // resource limit (used in calculating headroom) is calculated in small + // increments to ensure that user-limit-percent can be met for all users in + // a queue. Take user-limit-percent out of the equation so that user + // resource limit will always be calculated to its max possible value. + e.setUserLimit(1000); + + final String user_0 = "user_0"; + + // Submit 2 applications for user_0 + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_1, user_0); // same user + + // Setup 1 node with 100GB of memory resources. + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 100*GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (100*GB), numNodes * 128); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Pending resource requests for app_0 and app_1 total 5GB. + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true, + priority, recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + // Start testing... + + // Assign 1st Container of 1GB + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0, + // all users (only user_0) queue 'e' should be able to consume 1GB. + // The first container should be assigned to app_0 with no headroom left + // even though user_0's apps are still asking for a total of 4GB. + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + // Assign 2nd container of 1GB + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // user_0 has no headroom due to user-limit-factor of 1.0. However capacity + // scheduler will assign one container more than user-limit-factor. + // This container also went to app_0. Still with no neadroom even though + // app_0 and app_1 are asking for a cumulative 3GB. + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + // Can't allocate 3rd container due to user-limit. Headroom still 0. + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB). + // Pending for both app_0 and app_1 are still 3GB, so user-limit-factor + // is no longer limiting the return value of + // getTotalPendingResourcesConsideringUserLimit() + e.setUserLimitFactor(10.0f); + assertEquals(3*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // app_0 is now satisified, app_1 is still asking for 2GB. + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + // Get the last 2 containers for app_1, no more pending requests. + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + + // Release each container from app_0 + for (RMContainer rmContainer : app_0.getLiveContainers()) { + e.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + } + + // Release each container from app_1 + for (RMContainer rmContainer : app_1.getLiveContainers()) { + e.completedContainer(clusterResource, app_1, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + } + } + + @Test + public void testGetTotalPendingResourcesConsideringUserLimitTwoUsers() + throws Exception { + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + // Allow queue 'e' to use 100% of cluster resources (max capacity). + e.setMaxCapacity(1.0f); + // When used queue resources goes above capacity (in this case, 1%), user + // resource limit (used in calculating headroom) is calculated in small + // increments to ensure that user-limit-percent can be met for all users in + // a queue. Take user-limit-percent out of the equation so that user + // resource limit will always be calculated to its max possible value. + e.setUserLimit(1000); + + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit 2 applications for user_0 + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_1, user_0); + + // Submit 2 applications for user_1 + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_2, user_1); + + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_1, e, + mock(ActiveUsersManager.class), spyRMContext); + e.submitApplicationAttempt(app_3, user_1); + + // Setup 1 node with 100GB of memory resources. + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 100*GB); + + final int numNodes = 1; + Resource clusterResource = + Resources.createResource(numNodes * (100*GB), numNodes * 128); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Pending resource requests for user_0: app_0 and app_1 total 3GB. + Priority priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + // Pending resource requests for user_1: app_2 and app_3 total 1GB. + priority = TestUtils.createMockPriority(1); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + app_3.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + // Start testing... + // With queue capacity set at 1% of 100GB and user-limit-factor set to 1.0, + // queue 'e' should be able to consume 1GB per user. + assertEquals(2*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // None of the apps have assigned resources + // user_0's apps: + assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + + // Assign 1st Container of 1GB + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // The first container was assigned to user_0's app_0. Queues total headroom + // has 1GB left for user_1. + assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + + // Assign 2nd container of 1GB + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // user_0 has no headroom due to user-limit-factor of 1.0. However capacity + // scheduler will assign one container more than user-limit-factor. So, + // this container went to user_0's app_1. so, headroom for queue 'e'e is + // still 1GB for user_1 + assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(0*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + + // Assign 3rd container. + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // Container was allocated to user_1's app_2 since user_1, Now, no headroom + // is left. + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(1*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + + // Assign 4th container. + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // Allocated to user_1's app_2 since scheduler allocates 1 container + // above user resource limit. Available headroom still 0. + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + int app_0_consumption = app_0.getCurrentConsumption().getMemory(); + assertEquals(1*GB, app_0_consumption); + int app_1_consumption = app_1.getCurrentConsumption().getMemory(); + assertEquals(1*GB, app_1_consumption); + // user_1's apps: + int app_2_consumption = app_2.getCurrentConsumption().getMemory(); + assertEquals(2*GB, app_2_consumption); + int app_3_consumption = app_3.getCurrentConsumption().getMemory(); + assertEquals(0*GB, app_3_consumption); + + // Attempt to assign 5th container. Will be a no-op. + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // Cannot allocate 5th container because both users are above their allowed + // user resource limit. Values should be the same as previously. + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + assertEquals(app_0_consumption, app_0.getCurrentConsumption().getMemory()); + assertEquals(app_1_consumption, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(app_2_consumption, app_2.getCurrentConsumption().getMemory()); + assertEquals(app_3_consumption, app_3.getCurrentConsumption().getMemory()); + + // Increase user-limit-factor from 1GB to 10GB (1% * 10 * 100GB = 10GB). + // Pending for both user_0 and user_1 are still 1GB each, so user-limit- + // factor is no longer the limiting factor. + e.setUserLimitFactor(10.0f); + + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // Next container goes to user_0's app_1, since it still wanted 1GB. + assertEquals(1*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + // user_0's apps: + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(2*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_3.getCurrentConsumption().getMemory()); + + e.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // Last container goes to user_1's app_3, since it still wanted 1GB. + // user_0's apps: + assertEquals(0*GB, e.getTotalPendingResourcesConsideringUserLimit( + clusterResource, RMNodeLabelsManager.NO_LABEL).getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + // user_1's apps: + assertEquals(2*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_3.getCurrentConsumption().getMemory()); + + // Release each container from app_0 + for (RMContainer rmContainer : app_0.getLiveContainers()) { + e.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + } + + // Release each container from app_1 + for (RMContainer rmContainer : app_1.getLiveContainers()) { + e.completedContainer(clusterResource, app_1, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + } + } + private List createListOfApps(int noOfApps, String user, LeafQueue defaultQueue) { List appsLists = new ArrayList();