YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed by Craig Welch
(cherry picked from commit 519e5a7dd2
)
This commit is contained in:
parent
eeb39dc218
commit
5c33e91229
|
@ -552,6 +552,9 @@ Release 2.6.0 - UNRELEASED
|
|||
YARN-2611. Fixing jenkins findbugs warning and TestRMWebServicesCapacitySched
|
||||
for branch YARN-1051. (Subru Krishnan and Carlo Curino via subru)
|
||||
|
||||
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
|
||||
AM allocates. (Craig Welch via jianhe)
|
||||
|
||||
Release 2.5.1 - 2014-09-05
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
|
||||
public class CapacityHeadroomProvider {
|
||||
|
||||
LeafQueue.User user;
|
||||
LeafQueue queue;
|
||||
FiCaSchedulerApp application;
|
||||
Resource required;
|
||||
LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
|
||||
|
||||
public CapacityHeadroomProvider(
|
||||
LeafQueue.User user,
|
||||
LeafQueue queue,
|
||||
FiCaSchedulerApp application,
|
||||
Resource required,
|
||||
LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
|
||||
|
||||
this.user = user;
|
||||
this.queue = queue;
|
||||
this.application = application;
|
||||
this.required = required;
|
||||
this.queueHeadroomInfo = queueHeadroomInfo;
|
||||
|
||||
}
|
||||
|
||||
public Resource getHeadroom() {
|
||||
|
||||
Resource queueMaxCap;
|
||||
Resource clusterResource;
|
||||
synchronized (queueHeadroomInfo) {
|
||||
queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
|
||||
clusterResource = queueHeadroomInfo.getClusterResource();
|
||||
}
|
||||
Resource headroom = queue.getHeadroom(user, queueMaxCap,
|
||||
clusterResource, application, required);
|
||||
|
||||
// Corner case to deal with applications being slightly over-limit
|
||||
if (headroom.getMemory() < 0) {
|
||||
headroom.setMemory(0);
|
||||
}
|
||||
return headroom;
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -469,6 +469,10 @@ public class CapacityScheduler extends
|
|||
// Re-configure queues
|
||||
root.reinitialize(newRoot, clusterResource);
|
||||
initializeQueueMappings();
|
||||
|
||||
// Re-calculate headroom for active applications
|
||||
root.updateClusterResource(clusterResource);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -132,6 +132,8 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
private boolean reservationsContinueLooking;
|
||||
|
||||
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
|
||||
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
String queueName, CSQueue parent, CSQueue old) {
|
||||
this.scheduler = cs;
|
||||
|
@ -970,6 +972,22 @@ public class LeafQueue implements CSQueue {
|
|||
// "re-reservation" is *free*
|
||||
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
|
||||
}
|
||||
|
||||
protected Resource getHeadroom(User user, Resource queueMaxCap,
|
||||
Resource clusterResource, FiCaSchedulerApp application, Resource required) {
|
||||
return getHeadroom(user, queueMaxCap, clusterResource,
|
||||
computeUserLimit(application, clusterResource, required, user));
|
||||
}
|
||||
|
||||
private Resource getHeadroom(User user, Resource queueMaxCap,
|
||||
Resource clusterResource, Resource userLimit) {
|
||||
Resource headroom =
|
||||
Resources.subtract(
|
||||
Resources.min(resourceCalculator, clusterResource,
|
||||
userLimit, queueMaxCap),
|
||||
user.getConsumedResources());
|
||||
return headroom;
|
||||
}
|
||||
|
||||
|
||||
@Private
|
||||
|
@ -1038,12 +1056,14 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
String user = application.getUser();
|
||||
|
||||
User queueUser = getUser(user);
|
||||
|
||||
/**
|
||||
* Headroom is min((userLimit, queue-max-cap) - consumed)
|
||||
*/
|
||||
|
||||
Resource userLimit = // User limit
|
||||
computeUserLimit(application, clusterResource, required);
|
||||
computeUserLimit(application, clusterResource, required, queueUser);
|
||||
|
||||
//Max avail capacity needs to take into account usage by ancestor-siblings
|
||||
//which are greater than their base capacity, so we are interested in "max avail"
|
||||
|
@ -1057,23 +1077,27 @@ public class LeafQueue implements CSQueue {
|
|||
clusterResource,
|
||||
absoluteMaxAvailCapacity,
|
||||
minimumAllocation);
|
||||
|
||||
synchronized (queueHeadroomInfo) {
|
||||
queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
|
||||
queueHeadroomInfo.setClusterResource(clusterResource);
|
||||
}
|
||||
|
||||
Resource userConsumed = getUser(user).getConsumedResources();
|
||||
Resource headroom =
|
||||
Resources.subtract(
|
||||
Resources.min(resourceCalculator, clusterResource,
|
||||
userLimit, queueMaxCap),
|
||||
userConsumed);
|
||||
Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Headroom calculation for user " + user + ": " +
|
||||
" userLimit=" + userLimit +
|
||||
" queueMaxCap=" + queueMaxCap +
|
||||
" consumed=" + userConsumed +
|
||||
" consumed=" + queueUser.getConsumedResources() +
|
||||
" headroom=" + headroom);
|
||||
}
|
||||
|
||||
application.setHeadroom(headroom);
|
||||
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
|
||||
queueUser, this, application, required, queueHeadroomInfo);
|
||||
|
||||
application.setHeadroomProvider(headroomProvider);
|
||||
|
||||
metrics.setAvailableResourcesToUser(user, headroom);
|
||||
|
||||
return userLimit;
|
||||
|
@ -1081,7 +1105,7 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
@Lock(NoLock.class)
|
||||
private Resource computeUserLimit(FiCaSchedulerApp application,
|
||||
Resource clusterResource, Resource required) {
|
||||
Resource clusterResource, Resource required, User user) {
|
||||
// What is our current capacity?
|
||||
// * It is equal to the max(required, queue-capacity) if
|
||||
// we're running below capacity. The 'max' ensures that jobs in queues
|
||||
|
@ -1138,7 +1162,7 @@ public class LeafQueue implements CSQueue {
|
|||
" userLimit=" + userLimit +
|
||||
" userLimitFactor=" + userLimitFactor +
|
||||
" required: " + required +
|
||||
" consumed: " + getUser(userName).getConsumedResources() +
|
||||
" consumed: " + user.getConsumedResources() +
|
||||
" limit: " + limit +
|
||||
" queueCapacity: " + queueCapacity +
|
||||
" qconsumed: " + usedResources +
|
||||
|
@ -1687,9 +1711,6 @@ public class LeafQueue implements CSQueue {
|
|||
String userName = application.getUser();
|
||||
User user = getUser(userName);
|
||||
user.assignContainer(resource);
|
||||
// Note this is a bit unconventional since it gets the object and modifies it here
|
||||
// rather then using set routine
|
||||
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
|
||||
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1896,4 +1917,29 @@ public class LeafQueue implements CSQueue {
|
|||
public void setMaxApplications(int maxApplications) {
|
||||
this.maxApplications = maxApplications;
|
||||
}
|
||||
|
||||
/*
|
||||
* Holds shared values used by all applications in
|
||||
* the queue to calculate headroom on demand
|
||||
*/
|
||||
static class QueueHeadroomInfo {
|
||||
private Resource queueMaxCap;
|
||||
private Resource clusterResource;
|
||||
|
||||
public void setQueueMaxCap(Resource queueMaxCap) {
|
||||
this.queueMaxCap = queueMaxCap;
|
||||
}
|
||||
|
||||
public Resource getQueueMaxCap() {
|
||||
return queueMaxCap;
|
||||
}
|
||||
|
||||
public void setClusterResource(Resource clusterResource) {
|
||||
this.clusterResource = clusterResource;
|
||||
}
|
||||
|
||||
public Resource getClusterResource() {
|
||||
return clusterResource;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
|
||||
|
||||
/**
|
||||
* Represents an application attempt from the viewpoint of the FIFO or Capacity
|
||||
|
@ -64,6 +65,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
|
||||
private final Set<ContainerId> containersToPreempt =
|
||||
new HashSet<ContainerId>();
|
||||
|
||||
private CapacityHeadroomProvider headroomProvider;
|
||||
|
||||
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||
|
@ -280,6 +283,31 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public synchronized void setHeadroomProvider(
|
||||
CapacityHeadroomProvider headroomProvider) {
|
||||
this.headroomProvider = headroomProvider;
|
||||
}
|
||||
|
||||
public synchronized CapacityHeadroomProvider getHeadroomProvider() {
|
||||
return headroomProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Resource getHeadroom() {
|
||||
if (headroomProvider != null) {
|
||||
return headroomProvider.getHeadroom();
|
||||
}
|
||||
return super.getHeadroom();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void transferStateFromPreviousAttempt(
|
||||
SchedulerApplicationAttempt appAttempt) {
|
||||
super.transferStateFromPreviousAttempt(appAttempt);
|
||||
this.headroomProvider =
|
||||
((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -518,7 +518,7 @@ public class TestApplicationLimits {
|
|||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false);
|
||||
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
|
||||
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
|
||||
// Submit second application from user_0, check headroom
|
||||
final ApplicationAttemptId appAttemptId_0_1 =
|
||||
|
@ -536,8 +536,8 @@ public class TestApplicationLimits {
|
|||
|
||||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
|
||||
verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
|
||||
verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
|
||||
|
||||
// Submit first application from user_1, check for new headroom
|
||||
final ApplicationAttemptId appAttemptId_1_0 =
|
||||
|
@ -556,17 +556,17 @@ public class TestApplicationLimits {
|
|||
// Schedule to compute
|
||||
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
|
||||
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
|
||||
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
|
||||
verify(app_1_0).setHeadroom(eq(expectedHeadroom));
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
|
||||
|
||||
// Now reduce cluster size and check for the smaller headroom
|
||||
clusterResource = Resources.createResource(90*16*GB);
|
||||
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
|
||||
verify(app_0_0).setHeadroom(eq(expectedHeadroom));
|
||||
verify(app_0_1).setHeadroom(eq(expectedHeadroom));
|
||||
verify(app_1_0).setHeadroom(eq(expectedHeadroom));
|
||||
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
|
||||
assertEquals(expectedHeadroom, app_1_0.getHeadroom());
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -639,6 +639,94 @@ public class TestLeafQueue {
|
|||
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserHeadroomMultiApp() throws Exception {
|
||||
// Mock the queue
|
||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||
//unset maxCapacity
|
||||
a.setMaxCapacity(1.0f);
|
||||
|
||||
// Users
|
||||
final String user_0 = "user_0";
|
||||
final String user_1 = "user_1";
|
||||
|
||||
// Submit applications
|
||||
final ApplicationAttemptId appAttemptId_0 =
|
||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||
FiCaSchedulerApp app_0 =
|
||||
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||
a.getActiveUsersManager(), spyRMContext);
|
||||
a.submitApplicationAttempt(app_0, user_0);
|
||||
|
||||
final ApplicationAttemptId appAttemptId_1 =
|
||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||
FiCaSchedulerApp app_1 =
|
||||
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||
a.getActiveUsersManager(), spyRMContext);
|
||||
a.submitApplicationAttempt(app_1, user_0); // same user
|
||||
|
||||
final ApplicationAttemptId appAttemptId_2 =
|
||||
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||
FiCaSchedulerApp app_2 =
|
||||
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
|
||||
a.getActiveUsersManager(), spyRMContext);
|
||||
a.submitApplicationAttempt(app_2, user_1);
|
||||
|
||||
// Setup some nodes
|
||||
String host_0 = "127.0.0.1";
|
||||
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK,
|
||||
0, 16*GB);
|
||||
String host_1 = "127.0.0.2";
|
||||
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK,
|
||||
0, 16*GB);
|
||||
|
||||
final int numNodes = 2;
|
||||
Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
|
||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||
|
||||
Priority priority = TestUtils.createMockPriority(1);
|
||||
|
||||
app_0.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
a.assignContainers(clusterResource, node_0, false);
|
||||
assertEquals(1*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
//Now, headroom is the same for all apps for a given user + queue combo
|
||||
//and a change to any app's headroom is reflected for all the user's apps
|
||||
//once those apps are active/have themselves calculated headroom for
|
||||
//allocation at least one time
|
||||
assertEquals(2*GB, app_0.getHeadroom().getMemory());
|
||||
assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active
|
||||
assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
|
||||
|
||||
app_1.updateResourceRequests(Collections.singletonList(
|
||||
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||
priority, recordFactory)));
|
||||
|
||||
a.assignContainers(clusterResource, node_0, false);
|
||||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(1*GB, app_0.getHeadroom().getMemory());
|
||||
assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active
|
||||
assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
|
||||
|
||||
//Complete container and verify that headroom is updated, for both apps
|
||||
//for the user
|
||||
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
|
||||
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
|
||||
ContainerStatus.newInstance(rmContainer.getContainerId(),
|
||||
ContainerState.COMPLETE, "",
|
||||
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
|
||||
RMContainerEventType.KILL, null, true);
|
||||
|
||||
assertEquals(2*GB, app_0.getHeadroom().getMemory());
|
||||
assertEquals(2*GB, app_1.getHeadroom().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeadroomWithMaxCap() throws Exception {
|
||||
// Mock the queue
|
||||
|
@ -710,16 +798,18 @@ public class TestLeafQueue {
|
|||
assertEquals(2*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
|
||||
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
|
||||
assertEquals(2*GB, app_0.getHeadroom().getMemory());
|
||||
// User limit = 4G, 2 in use
|
||||
assertEquals(0*GB, app_1.getHeadroom().getMemory());
|
||||
// the application is not yet active
|
||||
|
||||
// Again one to user_0 since he hasn't exceeded user limit yet
|
||||
a.assignContainers(clusterResource, node_0, false);
|
||||
assertEquals(3*GB, a.getUsedResources().getMemory());
|
||||
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
|
||||
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
|
||||
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
|
||||
assertEquals(0*GB, app_1.getHeadroom().getMemory()); // 3G - 2G
|
||||
assertEquals(1*GB, app_0.getHeadroom().getMemory()); // 4G - 3G
|
||||
assertEquals(1*GB, app_1.getHeadroom().getMemory()); // 4G - 3G
|
||||
|
||||
// Submit requests for app_1 and set max-cap
|
||||
a.setMaxCapacity(.1f);
|
||||
|
|
Loading…
Reference in New Issue