YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed by Craig Welch

This commit is contained in:
Jian He 2014-10-06 15:47:48 -07:00
parent 8dc6abf2f4
commit 519e5a7dd2
7 changed files with 263 additions and 27 deletions

View File

@ -582,6 +582,9 @@ Release 2.6.0 - UNRELEASED
YARN-2611. Fixing jenkins findbugs warning and TestRMWebServicesCapacitySched
for branch YARN-1051. (Subru Krishnan and Carlo Curino via subru)
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
AM allocates. (Craig Welch via jianhe)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.api.records.Resource;
public class CapacityHeadroomProvider {
LeafQueue.User user;
LeafQueue queue;
FiCaSchedulerApp application;
Resource required;
LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
public CapacityHeadroomProvider(
LeafQueue.User user,
LeafQueue queue,
FiCaSchedulerApp application,
Resource required,
LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
this.user = user;
this.queue = queue;
this.application = application;
this.required = required;
this.queueHeadroomInfo = queueHeadroomInfo;
}
public Resource getHeadroom() {
Resource queueMaxCap;
Resource clusterResource;
synchronized (queueHeadroomInfo) {
queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
clusterResource = queueHeadroomInfo.getClusterResource();
}
Resource headroom = queue.getHeadroom(user, queueMaxCap,
clusterResource, application, required);
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) {
headroom.setMemory(0);
}
return headroom;
}
}

View File

@ -469,6 +469,10 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
// Re-calculate headroom for active applications
root.updateClusterResource(clusterResource);
}
/**

View File

@ -132,6 +132,8 @@ public class LeafQueue implements CSQueue {
private boolean reservationsContinueLooking;
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
this.scheduler = cs;
@ -970,6 +972,22 @@ public class LeafQueue implements CSQueue {
// "re-reservation" is *free*
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
protected Resource getHeadroom(User user, Resource queueMaxCap,
Resource clusterResource, FiCaSchedulerApp application, Resource required) {
return getHeadroom(user, queueMaxCap, clusterResource,
computeUserLimit(application, clusterResource, required, user));
}
private Resource getHeadroom(User user, Resource queueMaxCap,
Resource clusterResource, Resource userLimit) {
Resource headroom =
Resources.subtract(
Resources.min(resourceCalculator, clusterResource,
userLimit, queueMaxCap),
user.getConsumedResources());
return headroom;
}
@Private
@ -1038,12 +1056,14 @@ public class LeafQueue implements CSQueue {
String user = application.getUser();
User queueUser = getUser(user);
/**
* Headroom is min((userLimit, queue-max-cap) - consumed)
*/
Resource userLimit = // User limit
computeUserLimit(application, clusterResource, required);
computeUserLimit(application, clusterResource, required, queueUser);
//Max avail capacity needs to take into account usage by ancestor-siblings
//which are greater than their base capacity, so we are interested in "max avail"
@ -1057,23 +1077,27 @@ public class LeafQueue implements CSQueue {
clusterResource,
absoluteMaxAvailCapacity,
minimumAllocation);
synchronized (queueHeadroomInfo) {
queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
queueHeadroomInfo.setClusterResource(clusterResource);
}
Resource userConsumed = getUser(user).getConsumedResources();
Resource headroom =
Resources.subtract(
Resources.min(resourceCalculator, clusterResource,
userLimit, queueMaxCap),
userConsumed);
Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxCap=" + queueMaxCap +
" consumed=" + userConsumed +
" consumed=" + queueUser.getConsumedResources() +
" headroom=" + headroom);
}
application.setHeadroom(headroom);
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
queueUser, this, application, required, queueHeadroomInfo);
application.setHeadroomProvider(headroomProvider);
metrics.setAvailableResourcesToUser(user, headroom);
return userLimit;
@ -1081,7 +1105,7 @@ public class LeafQueue implements CSQueue {
@Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, Resource required) {
Resource clusterResource, Resource required, User user) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
@ -1138,7 +1162,7 @@ public class LeafQueue implements CSQueue {
" userLimit=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + getUser(userName).getConsumedResources() +
" consumed: " + user.getConsumedResources() +
" limit: " + limit +
" queueCapacity: " + queueCapacity +
" qconsumed: " + usedResources +
@ -1687,9 +1711,6 @@ public class LeafQueue implements CSQueue {
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource);
// Note this is a bit unconventional since it gets the object and modifies it here
// rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
@ -1896,4 +1917,29 @@ public class LeafQueue implements CSQueue {
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
/*
* Holds shared values used by all applications in
* the queue to calculate headroom on demand
*/
static class QueueHeadroomInfo {
private Resource queueMaxCap;
private Resource clusterResource;
public void setQueueMaxCap(Resource queueMaxCap) {
this.queueMaxCap = queueMaxCap;
}
public Resource getQueueMaxCap() {
return queueMaxCap;
}
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
public Resource getClusterResource() {
return clusterResource;
}
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
@ -64,6 +65,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
private CapacityHeadroomProvider headroomProvider;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@ -280,6 +283,31 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
return null;
}
public synchronized void setHeadroomProvider(
CapacityHeadroomProvider headroomProvider) {
this.headroomProvider = headroomProvider;
}
public synchronized CapacityHeadroomProvider getHeadroomProvider() {
return headroomProvider;
}
@Override
public synchronized Resource getHeadroom() {
if (headroomProvider != null) {
return headroomProvider.getHeadroom();
}
return super.getHeadroom();
}
@Override
public synchronized void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
super.transferStateFromPreviousAttempt(appAttempt);
this.headroomProvider =
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
}
}

View File

@ -518,7 +518,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
@ -536,8 +536,8 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
@ -556,17 +556,17 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
verify(app_1_0).setHeadroom(eq(expectedHeadroom));
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
verify(app_1_0).setHeadroom(eq(expectedHeadroom));
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
}

View File

@ -639,6 +639,94 @@ public class TestLeafQueue {
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
}
@Test
public void testUserHeadroomMultiApp() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK,
0, 16*GB);
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK,
0, 16*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
//Now, headroom is the same for all apps for a given user + queue combo
//and a change to any app's headroom is reflected for all the user's apps
//once those apps are active/have themselves calculated headroom for
//allocation at least one time
assertEquals(2*GB, app_0.getHeadroom().getMemory());
assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active
assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
a.assignContainers(clusterResource, node_0, false);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_0.getHeadroom().getMemory());
assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active
assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
//Complete container and verify that headroom is updated, for both apps
//for the user
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
assertEquals(2*GB, app_0.getHeadroom().getMemory());
assertEquals(2*GB, app_1.getHeadroom().getMemory());
}
@Test
public void testHeadroomWithMaxCap() throws Exception {
// Mock the queue
@ -710,16 +798,18 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
assertEquals(2*GB, app_0.getHeadroom().getMemory());
// User limit = 4G, 2 in use
assertEquals(0*GB, app_1.getHeadroom().getMemory());
// the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // 3G - 2G
assertEquals(1*GB, app_0.getHeadroom().getMemory()); // 4G - 3G
assertEquals(1*GB, app_1.getHeadroom().getMemory()); // 4G - 3G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);