MAPREDUCE-3752. Modified application limits to include queue max-capacities besides the usual user limits. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239422 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-02-02 00:41:09 +00:00
parent b4929bcf14
commit ef1a619a4d
6 changed files with 207 additions and 34 deletions

View File

@ -651,6 +651,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files. MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
(Arun C Murthy via sseth) (Arun C Murthy via sseth)
MAPREDUCE-3752. Modified application limits to include queue max-capacities
besides the usual user limits. (Arun C Murthy via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -111,4 +111,12 @@ public static boolean greaterThan(Resource lhs, Resource rhs) {
public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) { public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
return lhs.getMemory() >= rhs.getMemory(); return lhs.getMemory() >= rhs.getMemory();
} }
public static Resource min(Resource lhs, Resource rhs) {
return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
}
public static Resource max(Resource lhs, Resource rhs) {
return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
}
} }

View File

@ -162,6 +162,13 @@ synchronized public void updateResourceRequests(
asks.put(hostName, request); asks.put(hostName, request);
if (updatePendingResources) { if (updatePendingResources) {
// Similarly, deactivate application?
if (request.getNumContainers() <= 0) {
LOG.info("checking for deactivate... ");
checkForDeactivation();
}
int lastRequestContainers = lastRequest != null ? lastRequest int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0; .getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest Resource lastRequestCapability = lastRequest != null ? lastRequest
@ -308,6 +315,11 @@ synchronized private void decrementOutstanding(
// Do we have any outstanding requests? // Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application // If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) { if (numOffSwitchContainers == 0) {
checkForDeactivation();
}
}
synchronized private void checkForDeactivation() {
boolean deactivate = true; boolean deactivate = true;
for (Priority priority : getPriorities()) { for (Priority priority : getPriorities()) {
ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY); ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
@ -320,7 +332,7 @@ synchronized private void decrementOutstanding(
activeUsersManager.deactivateApplication(user, applicationId); activeUsersManager.deactivateApplication(user, applicationId);
} }
} }
}
synchronized private void allocate(Container container) { synchronized private void allocate(Container container) {
// Update consumption and track allocations // Update consumption and track allocations
//TODO: fixme sharad //TODO: fixme sharad

View File

@ -751,13 +751,13 @@ private synchronized SchedulerApp getApplication(
continue; continue;
} }
// Compute & set headroom // Compute user-limit & set headroom
// Note: We set the headroom with the highest priority request // Note: We compute both user-limit & headroom with the highest
// as the target. // priority request as the target.
// This works since we never assign lower priority requests // This works since we never assign lower priority requests
// before all higher priority ones are serviced. // before all higher priority ones are serviced.
Resource userLimit = Resource userLimit =
computeAndSetUserResourceLimit(application, clusterResource, computeUserLimitAndSetHeadroom(application, clusterResource,
required); required);
// Check queue max-capacity limit // Check queue max-capacity limit
@ -778,12 +778,12 @@ private synchronized SchedulerApp getApplication(
assignContainersOnNode(clusterResource, node, application, priority, assignContainersOnNode(clusterResource, node, application, priority,
null); null);
Resource assigned = assignment.getResource();
// Did we schedule or reserve a container? // Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(assigned, Resources.none())) { if (Resources.greaterThan(assigned, Resources.none())) {
// Book-keeping // Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned); allocateResource(clusterResource, application, assigned);
// Reset scheduling opportunities // Reset scheduling opportunities
@ -854,20 +854,50 @@ private synchronized boolean assignToQueue(Resource clusterResource,
} }
@Lock({LeafQueue.class, SchedulerApp.class}) @Lock({LeafQueue.class, SchedulerApp.class})
private Resource computeAndSetUserResourceLimit(SchedulerApp application, private Resource computeUserLimitAndSetHeadroom(
Resource clusterResource, Resource required) { SchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser(); String user = application.getUser();
Resource limit = computeUserLimit(application, clusterResource, required);
/**
* Headroom is min((userLimit, queue-max-cap) - consumed)
*/
Resource userLimit = // User limit
computeUserLimit(application, clusterResource, required);
Resource queueMaxCap = // Queue Max-Capacity
Resources.createResource(
roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
);
Resource userConsumed = getUser(user).getConsumedResources();
Resource headroom = Resource headroom =
Resources.subtract(limit, getUser(user).getConsumedResources()); Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxCap=" + queueMaxCap +
" consumed=" + userConsumed +
" headroom=" + headroom);
}
application.setHeadroom(headroom); application.setHeadroom(headroom);
metrics.setAvailableResourcesToUser(user, headroom); metrics.setAvailableResourcesToUser(user, headroom);
return limit;
return userLimit;
} }
private int roundUp(int memory) { private int roundUp(int memory) {
return divideAndCeil(memory, minimumAllocation.getMemory()) * int minMemory = minimumAllocation.getMemory();
minimumAllocation.getMemory(); return divideAndCeil(memory, minMemory) * minMemory;
}
private int roundDown(int memory) {
int minMemory = minimumAllocation.getMemory();
return (memory / minMemory) * minMemory;
} }
@Lock(NoLock.class) @Lock(NoLock.class)
@ -1288,10 +1318,17 @@ synchronized void allocateResource(Resource clusterResource,
String userName = application.getUser(); String userName = application.getUser();
User user = getUser(userName); User user = getUser(userName);
user.assignContainer(resource); user.assignContainer(resource);
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
LOG.info(getQueueName() + LOG.info(getQueueName() +
" user=" + userName +
" used=" + usedResources + " numContainers=" + numContainers + " used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " user-resources=" + user.getConsumedResources()); " headroom = " + application.getHeadroom() +
" user-resources=" + user.getConsumedResources()
);
}
} }
synchronized void releaseResource(Resource clusterResource, synchronized void releaseResource(Resource clusterResource,
@ -1325,8 +1362,8 @@ public synchronized void updateClusterResource(Resource clusterResource) {
// Update application properties // Update application properties
for (SchedulerApp application : activeApplications) { for (SchedulerApp application : activeApplications) {
synchronized (application) { synchronized (application) {
computeAndSetUserResourceLimit( computeUserLimitAndSetHeadroom(application, clusterResource,
application, clusterResource, Resources.none()); Resources.none());
} }
} }
} }

View File

@ -38,6 +38,8 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -67,6 +69,8 @@
public class TestLeafQueue { public class TestLeafQueue {
private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
@ -472,6 +476,115 @@ public void testUserLimits() throws Exception {
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
} }
@Test
public void testHeadroomWithMaxCap() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_0, user_0, A);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_1, user_0, A); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_2 =
new SchedulerApp(appAttemptId_2, user_1, a,
a.getActiveUsersManager(), rmContext, null);
a.submitApplication(app_2, user_1, A);
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "host_1";
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
recordFactory)));
/**
* Start testing...
*/
// Set user-limit
a.setUserLimit(50);
a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
1, a.getActiveUsersManager().getNumActiveUsers());
// 1 container to user_0
a.assignContainers(clusterResource, node_0);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_0.getHeadroom().getMemory());
assertEquals(0*GB, app_1.getHeadroom().getMemory());
// Check headroom for app_2
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@Test @Test
public void testSingleQueueWithMultipleUsers() throws Exception { public void testSingleQueueWithMultipleUsers() throws Exception {

View File

@ -86,7 +86,7 @@ private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
private SchedulerApp getMockApplication(int appId, String user) { private SchedulerApp getMockApplication(int appId, String user) {
SchedulerApp application = mock(SchedulerApp.class); SchedulerApp application = mock(SchedulerApp.class);
doReturn(user).when(application).getUser(); doReturn(user).when(application).getUser();
doReturn(null).when(application).getHeadroom(); doReturn(Resources.createResource(0)).when(application).getHeadroom();
return application; return application;
} }