From 519e5a7dd2bd540105434ec3c8939b68f6c024f8 Mon Sep 17 00:00:00 2001 From: Jian He Date: Mon, 6 Oct 2014 15:47:48 -0700 Subject: [PATCH] YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed by Craig Welch --- hadoop-yarn-project/CHANGES.txt | 3 + .../capacity/CapacityHeadroomProvider.java | 65 ++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 4 + .../scheduler/capacity/LeafQueue.java | 74 +++++++++++--- .../common/fica/FiCaSchedulerApp.java | 28 ++++++ .../capacity/TestApplicationLimits.java | 18 ++-- .../scheduler/capacity/TestLeafQueue.java | 98 ++++++++++++++++++- 7 files changed, 263 insertions(+), 27 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a62202b9204..5cb2fc7305a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -582,6 +582,9 @@ Release 2.6.0 - UNRELEASED YARN-2611. Fixing jenkins findbugs warning and TestRMWebServicesCapacitySched for branch YARN-1051. (Subru Krishnan and Carlo Curino via subru) + YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when + AM allocates. (Craig Welch via jianhe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java new file mode 100644 index 00000000000..f79d19507eb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.api.records.Resource; + +public class CapacityHeadroomProvider { + + LeafQueue.User user; + LeafQueue queue; + FiCaSchedulerApp application; + Resource required; + LeafQueue.QueueHeadroomInfo queueHeadroomInfo; + + public CapacityHeadroomProvider( + LeafQueue.User user, + LeafQueue queue, + FiCaSchedulerApp application, + Resource required, + LeafQueue.QueueHeadroomInfo queueHeadroomInfo) { + + this.user = user; + this.queue = queue; + this.application = application; + this.required = required; + this.queueHeadroomInfo = queueHeadroomInfo; + + } + + public Resource getHeadroom() { + + Resource queueMaxCap; + Resource clusterResource; + synchronized (queueHeadroomInfo) { + queueMaxCap = queueHeadroomInfo.getQueueMaxCap(); + clusterResource = queueHeadroomInfo.getClusterResource(); + } + Resource headroom = queue.getHeadroom(user, queueMaxCap, + clusterResource, application, required); + + // Corner case to deal with applications being slightly over-limit + if (headroom.getMemory() < 0) { + headroom.setMemory(0); + } + return headroom; + + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6a3c7dc030d..02f27b8cdca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -469,6 +469,10 @@ public class CapacityScheduler extends // Re-configure queues root.reinitialize(newRoot, clusterResource); initializeQueueMappings(); + + // Re-calculate headroom for active applications + root.updateClusterResource(clusterResource); + } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index f0cff71df72..57f0907eb59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -132,6 +132,8 @@ public class LeafQueue implements CSQueue { private boolean reservationsContinueLooking; + private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo(); + public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) { this.scheduler = cs; @@ -970,6 +972,22 @@ public class LeafQueue implements CSQueue { // "re-reservation" is *free* return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } + + protected Resource getHeadroom(User user, Resource queueMaxCap, + Resource clusterResource, FiCaSchedulerApp application, Resource required) { + return getHeadroom(user, queueMaxCap, clusterResource, + computeUserLimit(application, clusterResource, required, user)); + } + + private Resource getHeadroom(User user, Resource queueMaxCap, + Resource clusterResource, Resource userLimit) { + Resource headroom = + Resources.subtract( + Resources.min(resourceCalculator, clusterResource, + userLimit, queueMaxCap), + user.getConsumedResources()); + return headroom; + } @Private @@ -1038,12 +1056,14 @@ public class LeafQueue implements CSQueue { String user = application.getUser(); + User queueUser = getUser(user); + /** * Headroom is min((userLimit, queue-max-cap) - consumed) */ Resource userLimit = // User limit - computeUserLimit(application, clusterResource, required); + computeUserLimit(application, clusterResource, required, queueUser); //Max avail capacity needs to take into account usage by ancestor-siblings //which are greater than their base capacity, so we are interested in "max avail" @@ -1057,23 +1077,27 @@ public class LeafQueue implements CSQueue { clusterResource, absoluteMaxAvailCapacity, minimumAllocation); + + synchronized (queueHeadroomInfo) { + queueHeadroomInfo.setQueueMaxCap(queueMaxCap); + queueHeadroomInfo.setClusterResource(clusterResource); + } - Resource userConsumed = getUser(user).getConsumedResources(); - Resource headroom = - Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), - userConsumed); + Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + " queueMaxCap=" + queueMaxCap + - " consumed=" + userConsumed + + " consumed=" + queueUser.getConsumedResources() + " headroom=" + headroom); } - application.setHeadroom(headroom); + CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider( + queueUser, this, application, required, queueHeadroomInfo); + + application.setHeadroomProvider(headroomProvider); + metrics.setAvailableResourcesToUser(user, headroom); return userLimit; @@ -1081,7 +1105,7 @@ public class LeafQueue implements CSQueue { @Lock(NoLock.class) private Resource computeUserLimit(FiCaSchedulerApp application, - Resource clusterResource, Resource required) { + Resource clusterResource, Resource required, User user) { // What is our current capacity? // * It is equal to the max(required, queue-capacity) if // we're running below capacity. The 'max' ensures that jobs in queues @@ -1138,7 +1162,7 @@ public class LeafQueue implements CSQueue { " userLimit=" + userLimit + " userLimitFactor=" + userLimitFactor + " required: " + required + - " consumed: " + getUser(userName).getConsumedResources() + + " consumed: " + user.getConsumedResources() + " limit: " + limit + " queueCapacity: " + queueCapacity + " qconsumed: " + usedResources + @@ -1687,9 +1711,6 @@ public class LeafQueue implements CSQueue { String userName = application.getUser(); User user = getUser(userName); user.assignContainer(resource); - // Note this is a bit unconventional since it gets the object and modifies it here - // rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { @@ -1896,4 +1917,29 @@ public class LeafQueue implements CSQueue { public void setMaxApplications(int maxApplications) { this.maxApplications = maxApplications; } + + /* + * Holds shared values used by all applications in + * the queue to calculate headroom on demand + */ + static class QueueHeadroomInfo { + private Resource queueMaxCap; + private Resource clusterResource; + + public void setQueueMaxCap(Resource queueMaxCap) { + this.queueMaxCap = queueMaxCap; + } + + public Resource getQueueMaxCap() { + return queueMaxCap; + } + + public void setClusterResource(Resource clusterResource) { + this.clusterResource = clusterResource; + } + + public Resource getClusterResource() { + return clusterResource; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index dc0d0f01d71..2f9569c6cbc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -64,6 +65,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private final Set containersToPreempt = new HashSet(); + + private CapacityHeadroomProvider headroomProvider; public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -280,6 +283,31 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { } return null; } + + public synchronized void setHeadroomProvider( + CapacityHeadroomProvider headroomProvider) { + this.headroomProvider = headroomProvider; + } + + public synchronized CapacityHeadroomProvider getHeadroomProvider() { + return headroomProvider; + } + + @Override + public synchronized Resource getHeadroom() { + if (headroomProvider != null) { + return headroomProvider.getHeadroom(); + } + return super.getHeadroom(); + } + + @Override + public synchronized void transferStateFromPreviousAttempt( + SchedulerApplicationAttempt appAttempt) { + super.transferStateFromPreviousAttempt(appAttempt); + this.headroomProvider = + ((FiCaSchedulerApp) appAttempt).getHeadroomProvider(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index ff8e873b15d..b922c02a5c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -518,7 +518,7 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, false); Resource expectedHeadroom = Resources.createResource(10*16*GB, 1); - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = @@ -536,8 +536,8 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, false); // Schedule to compute - verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = @@ -556,17 +556,17 @@ public class TestApplicationLimits { // Schedule to compute queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); - verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); // Now reduce cluster size and check for the smaller headroom clusterResource = Resources.createResource(90*16*GB); queue.assignContainers(clusterResource, node_0, false); // Schedule to compute expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes - verify(app_0_0).setHeadroom(eq(expectedHeadroom)); - verify(app_0_1).setHeadroom(eq(expectedHeadroom)); - verify(app_1_0).setHeadroom(eq(expectedHeadroom)); + assertEquals(expectedHeadroom, app_0_0.getHeadroom()); + assertEquals(expectedHeadroom, app_0_1.getHeadroom()); + assertEquals(expectedHeadroom, app_1_0.getHeadroom()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 092ff83fdd4..9e06c524377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -639,6 +639,94 @@ public class TestLeafQueue { assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); } + @Test + public void testUserHeadroomMultiApp() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_1, user_0); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, + a.getActiveUsersManager(), spyRMContext); + a.submitApplicationAttempt(app_2, user_1); + + // Setup some nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, + 0, 16*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, + 0, 16*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + Priority priority = TestUtils.createMockPriority(1); + + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0, false); + assertEquals(1*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); + //Now, headroom is the same for all apps for a given user + queue combo + //and a change to any app's headroom is reflected for all the user's apps + //once those apps are active/have themselves calculated headroom for + //allocation at least one time + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active + assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active + + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + a.assignContainers(clusterResource, node_0, false); + assertEquals(2*GB, a.getUsedResources().getMemory()); + assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_0.getHeadroom().getMemory()); + assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active + assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active + + //Complete container and verify that headroom is updated, for both apps + //for the user + RMContainer rmContainer = app_0.getLiveContainers().iterator().next(); + a.completedContainer(clusterResource, app_0, node_0, rmContainer, + ContainerStatus.newInstance(rmContainer.getContainerId(), + ContainerState.COMPLETE, "", + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER), + RMContainerEventType.KILL, null, true); + + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + assertEquals(2*GB, app_1.getHeadroom().getMemory()); + } + @Test public void testHeadroomWithMaxCap() throws Exception { // Mock the queue @@ -710,16 +798,18 @@ public class TestLeafQueue { assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); - assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G - assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G + assertEquals(2*GB, app_0.getHeadroom().getMemory()); + // User limit = 4G, 2 in use + assertEquals(0*GB, app_1.getHeadroom().getMemory()); + // the application is not yet active // Again one to user_0 since he hasn't exceeded user limit yet a.assignContainers(clusterResource, node_0, false); assertEquals(3*GB, a.getUsedResources().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); - assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G - assertEquals(0*GB, app_1.getHeadroom().getMemory()); // 3G - 2G + assertEquals(1*GB, app_0.getHeadroom().getMemory()); // 4G - 3G + assertEquals(1*GB, app_1.getHeadroom().getMemory()); // 4G - 3G // Submit requests for app_1 and set max-cap a.setMaxCapacity(.1f);