MAPREDUCE-3713. Fixed the way head-room is allocated to applications by CapacityScheduler so that it deducts current-usage per user and not per-application. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1235989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2012-01-25 23:31:14 +00:00
parent 1149d9a13d
commit 21c9116309
5 changed files with 102 additions and 47 deletions

View File

@ -564,6 +564,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3683. Fixed maxCapacity of queues to be product of parent MAPREDUCE-3683. Fixed maxCapacity of queues to be product of parent
maxCapacities. (acmurthy) maxCapacities. (acmurthy)
MAPREDUCE-3713. Fixed the way head-room is allocated to applications by
CapacityScheduler so that it deducts current-usage per user and not
per-application. (Arun C Murthy via vinodkv)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -295,10 +295,6 @@ public class SchedulerApp {
} }
} }
public synchronized void setAvailableResourceLimit(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
public synchronized RMContainer getRMContainer(ContainerId id) { public synchronized RMContainer getRMContainer(ContainerId id) {
return liveContainers.get(id); return liveContainers.get(id);
} }
@ -446,20 +442,21 @@ public class SchedulerApp {
return reservedContainers; return reservedContainers;
} }
public synchronized void setHeadroom(Resource globalLimit) {
this.resourceLimit = globalLimit;
}
/** /**
* Get available headroom in terms of resources for the application's user. * Get available headroom in terms of resources for the application's user.
* @return available resource headroom * @return available resource headroom
*/ */
public synchronized Resource getHeadroom() { public synchronized Resource getHeadroom() {
Resource limit = Resources.subtract(resourceLimit, currentConsumption);
Resources.subtractFrom(limit, currentReservation);
// Corner case to deal with applications being slightly over-limit // Corner case to deal with applications being slightly over-limit
if (limit.getMemory() < 0) { if (resourceLimit.getMemory() < 0) {
limit.setMemory(0); resourceLimit.setMemory(0);
} }
return limit; return resourceLimit;
} }
public Queue getQueue() { public Queue getQueue() {

View File

@ -720,12 +720,11 @@ public class LeafQueue implements CSQueue {
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("pre-assignContainers for application " LOG.debug("pre-assignContainers for application "
+ application.getApplicationId()); + application.getApplicationId());
application.showRequests();
} }
application.showRequests();
synchronized (application) { synchronized (application) {
computeAndSetUserResourceLimit(application, clusterResource); // Schedule in priority order
for (Priority priority : application.getPriorities()) { for (Priority priority : application.getPriorities()) {
// Required resource // Required resource
Resource required = Resource required =
@ -736,15 +735,21 @@ public class LeafQueue implements CSQueue {
continue; continue;
} }
// Are we going over limits by allocating to this application? // Compute & set headroom
// Maximum Capacity of the queue // Note: We set the headroom with the highest priority request
// as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit =
computeAndSetUserResourceLimit(application, clusterResource,
required);
// Check queue max-capacity limit
if (!assignToQueue(clusterResource, required)) { if (!assignToQueue(clusterResource, required)) {
return NULL_ASSIGNMENT; return NULL_ASSIGNMENT;
} }
// User limits // Check user limit
Resource userLimit =
computeUserLimit(application, clusterResource, required);
if (!assignToUser(application.getUser(), userLimit)) { if (!assignToUser(application.getUser(), userLimit)) {
break; break;
} }
@ -832,13 +837,15 @@ public class LeafQueue implements CSQueue {
return true; return true;
} }
private void computeAndSetUserResourceLimit(SchedulerApp application, private Resource computeAndSetUserResourceLimit(SchedulerApp application,
Resource clusterResource) { Resource clusterResource, Resource required) {
Resource userLimit = String user = application.getUser();
computeUserLimit(application, clusterResource, Resources.none()); Resource limit = computeUserLimit(application, clusterResource, required);
application.setAvailableResourceLimit(userLimit); Resource headroom =
metrics.setAvailableResourcesToUser(application.getUser(), Resources.subtract(limit, getUser(user).getConsumedResources());
application.getHeadroom()); application.setHeadroom(headroom);
metrics.setAvailableResourcesToUser(user, headroom);
return limit;
} }
private int roundUp(int memory) { private int roundUp(int memory) {
@ -909,7 +916,7 @@ public class LeafQueue implements CSQueue {
User user = getUser(userName); User user = getUser(userName);
// Note: We aren't considering the current request since there is a fixed // Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a >= check, so... // overhead of the AM, but it's a > check, not a >= check, so...
if ((user.getConsumedResources().getMemory()) > limit.getMemory()) { if ((user.getConsumedResources().getMemory()) > limit.getMemory()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName() + LOG.debug("User " + userName + " in queue " + getQueueName() +
@ -1227,8 +1234,8 @@ public class LeafQueue implements CSQueue {
// happen under scheduler's lock... // happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node // So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) { if (rmContainer.getState() == RMContainerState.RESERVED) {
application.unreserve(node, rmContainer.getReservedPriority()); unreserve(application, rmContainer.getReservedPriority(),
node.unreserveResource(application); node, rmContainer);
} else { } else {
application.containerCompleted(rmContainer, containerStatus, event); application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container); node.releaseContainer(container);
@ -1301,7 +1308,8 @@ public class LeafQueue implements CSQueue {
// Update application properties // Update application properties
for (SchedulerApp application : activeApplications) { for (SchedulerApp application : activeApplications) {
computeAndSetUserResourceLimit(application, clusterResource); computeAndSetUserResourceLimit(
application, clusterResource, Resources.none());
} }
} }

View File

@ -358,7 +358,7 @@ public class FifoScheduler implements ResourceScheduler {
} }
} }
application.setAvailableResourceLimit(clusterResource); application.setHeadroom(clusterResource);
LOG.debug("post-assignContainers"); LOG.debug("post-assignContainers");
application.showRequests(); application.showRequests();

View File

@ -21,16 +21,24 @@ import static org.junit.Assert.*;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.After; import org.junit.After;
@ -283,38 +291,76 @@ public class TestApplicationLimits {
final String user_0 = "user_0"; final String user_0 = "user_0";
final String user_1 = "user_1"; final String user_1 = "user_1";
int APPLICATION_ID = 0; RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RMContext rmContext = TestUtils.getMockRMContext();
// Submit first application from user_0, check headroom Priority priority_1 = TestUtils.createMockPriority(1);
SchedulerApp app_0_0 = getMockApplication(APPLICATION_ID++, user_0);
// Submit first application with some resource-requests from user_0,
// and check headroom
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0_0 =
spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, rmContext, null));
queue.submitApplication(app_0_0, user_0, A); queue.submitApplication(app_0_0, user_0, A);
queue.assignContainers(clusterResource, node_0); // Schedule to compute
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0);
Resource expectedHeadroom = Resources.createResource(10*16*GB); Resource expectedHeadroom = Resources.createResource(10*16*GB);
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_0).setHeadroom(eq(expectedHeadroom));
// Submit second application from user_0, check headroom // Submit second application from user_0, check headroom
SchedulerApp app_0_1 = getMockApplication(APPLICATION_ID++, user_0); final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_0_1 =
spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, rmContext, null));
queue.submitApplication(app_0_1, user_0, A); queue.submitApplication(app_0_1, user_0, A);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute
verify(app_0_0, times(2)).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));// no change verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
// Submit first application from user_1, check for new headroom // Submit first application from user_1, check for new headroom
SchedulerApp app_1_0 = getMockApplication(APPLICATION_ID++, user_1); final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
SchedulerApp app_1_0 =
spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, rmContext, null));
queue.submitApplication(app_1_0, user_1, A); queue.submitApplication(app_1_0, user_1, A);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
queue.assignContainers(clusterResource, node_0); // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom));
verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_1_0).setHeadroom(eq(expectedHeadroom));
// Now reduce cluster size and check for the smaller headroom // Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB); clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0); // Schedule to compute queue.assignContainers(clusterResource, node_0); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_0).setHeadroom(eq(expectedHeadroom));
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_0_1).setHeadroom(eq(expectedHeadroom));
verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom)); verify(app_1_0).setHeadroom(eq(expectedHeadroom));
} }