diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 4f901335d32..6ebed6e0576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -151,6 +151,17 @@ public class ResourcePBImpl extends Resource { .newInstance(ResourceInformation.VCORES); this.setMemorySize(p.getMemory()); this.setVirtualCores(p.getVirtualCores()); + + // Update missing resource information on respective index. + updateResourceInformationMap(types); + } + + private void updateResourceInformationMap(ResourceInformation[] types) { + for (int i = 0; i < types.length; i++) { + if (resources[i] == null) { + resources[i] = ResourceInformation.newInstance(types[i]); + } + } } private static ResourceInformation newDefaultInformation( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 30bff78f325..1426e881385 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1027,6 +1027,8 @@ public class TestLeafQueue { Resource clusterResource = Resources.createResource(numNodes * (8*GB), numNodes * 16); when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); // Setup resource-requests // app_0 asks for 3 3-GB containers @@ -1083,9 +1085,15 @@ public class TestLeafQueue { a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); - assertEquals(12*GB, a.getUsedResources().getMemorySize()); - assertEquals(12*GB, app_0.getCurrentConsumption().getMemorySize()); - assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(9*GB, a.getUsedResources().getMemorySize()); + assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); + + assertEquals(4 * GB, + app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize()); + + assertEquals(1 * GB, + app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -1317,11 +1325,6 @@ public class TestLeafQueue { Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); when(csContext.getNumClusterNodes()).thenReturn(numNodes); - ParentQueue root = (ParentQueue) queues - .get(CapacitySchedulerConfiguration.ROOT); - root.updateClusterResource(clusterResource, - new ResourceLimits(clusterResource)); - // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( @@ -1340,6 +1343,11 @@ public class TestLeafQueue { a.setUserLimit(50); a.setUserLimitFactor(2); + ParentQueue root = (ParentQueue) queues + .get(CapacitySchedulerConfiguration.ROOT); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + // Now, only user_0 should be active since he is the only one with // outstanding requests assertEquals("There should only be 1 active user!", @@ -1368,8 +1376,8 @@ public class TestLeafQueue { assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize()); assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); - assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G - + assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); + // Submit requests for app_1 and set max-cap a.setMaxCapacity(.1f); root.updateClusterResource(clusterResource, @@ -1404,6 +1412,108 @@ public class TestLeafQueue { assertEquals(0*GB, app_2.getHeadroom().getMemorySize()); // hit queue max-cap } + @Test + public void testUserHeadroomMultiApp() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + // unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_0); // same user + + final ApplicationAttemptId appAttemptId_2 = TestUtils + .getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = new FiCaSchedulerApp(appAttemptId_2, user_1, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_2, user_1); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 16 * GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 16 * GB); + + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1, app_2.getApplicationAttemptId(), app_2); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (16 * GB), + 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + Priority priority = TestUtils.createMockPriority(1); + + app_0.updateResourceRequests( + Collections.singletonList(TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory))); + + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + a, nodes, apps); + assertEquals(1 * GB, a.getUsedResources().getMemorySize()); + assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize()); + // Now, headroom is the same for all apps for a given user + queue combo + // and a change to any app's headroom is reflected for all the user's apps + // once those apps are active/have themselves calculated headroom for + // allocation at least one time + assertEquals(2 * GB, app_0.getHeadroom().getMemorySize()); + assertEquals(2 * GB, app_1.getHeadroom().getMemorySize());// not yet active + assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active + + app_1.updateResourceRequests( + Collections.singletonList(TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 2, true, priority, recordFactory))); + + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + a, nodes, apps); + assertEquals(2 * GB, a.getUsedResources().getMemorySize()); + assertEquals(1 * GB, app_0.getCurrentConsumption().getMemorySize()); + assertEquals(1 * GB, app_1.getCurrentConsumption().getMemorySize()); + assertEquals(1 * GB, app_0.getHeadroom().getMemorySize()); + assertEquals(1 * GB, app_1.getHeadroom().getMemorySize());// now active + assertEquals(3 * GB, app_2.getHeadroom().getMemorySize());// not yet active + + // Complete container and verify that headroom is updated, for both apps + // for the user + RMContainer rmContainer = app_0.getLiveContainers().iterator().next(); + a.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + + assertEquals(2 * GB, app_0.getHeadroom().getMemorySize()); + assertEquals(2 * GB, app_1.getHeadroom().getMemorySize()); + } + @Test public void testSingleQueueWithMultipleUsers() throws Exception {