MAPREDUCE-2794. [MR-279] Incorrect metrics value for AvailableGB per queue per user. (John George via mahadev) - Merging r1179936 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1179937 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-10-07 05:34:51 +00:00
parent 2459b72cd9
commit 02bc9ecf9b
4 changed files with 76 additions and 11 deletions

View File

@ -1497,6 +1497,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2913. Fixed TestMRJobs.testFailingMapper to assert the correct MAPREDUCE-2913. Fixed TestMRJobs.testFailingMapper to assert the correct
TaskCompletionEventStatus. (Jonathan Eagles via vinodkv) TaskCompletionEventStatus. (Jonathan Eagles via vinodkv)
MAPREDUCE-2794. [MR-279] Incorrect metrics value for AvailableGB per
queue per user. (John George via mahadev)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -740,7 +740,7 @@ public class LeafQueue implements CSQueue {
// Book-keeping // Book-keeping
allocateResource(clusterResource, allocateResource(clusterResource,
application.getUser(), assignedResource); application, assignedResource);
// Reset scheduling opportunities // Reset scheduling opportunities
application.resetSchedulingOpportunities(priority); application.resetSchedulingOpportunities(priority);
@ -810,7 +810,7 @@ public class LeafQueue implements CSQueue {
private void setUserResourceLimit(SchedulerApp application, private void setUserResourceLimit(SchedulerApp application,
Resource resourceLimit) { Resource resourceLimit) {
application.setAvailableResourceLimit(resourceLimit); application.setAvailableResourceLimit(resourceLimit);
metrics.setAvailableResourcesToUser(application.getUser(), resourceLimit); metrics.setAvailableResourcesToUser(application.getUser(), application.getHeadroom());
} }
private int roundUp(int memory) { private int roundUp(int memory) {
@ -1216,7 +1216,7 @@ public class LeafQueue implements CSQueue {
// Book-keeping // Book-keeping
releaseResource(clusterResource, releaseResource(clusterResource,
application.getUser(), container.getResource()); application, container.getResource());
LOG.info("completedContainer" + LOG.info("completedContainer" +
" container=" + container + " container=" + container +
@ -1234,32 +1234,35 @@ public class LeafQueue implements CSQueue {
} }
synchronized void allocateResource(Resource clusterResource, synchronized void allocateResource(Resource clusterResource,
String userName, Resource resource) { SchedulerApp application, Resource resource) {
// Update queue metrics // Update queue metrics
Resources.addTo(usedResources, resource); Resources.addTo(usedResources, resource);
updateResource(clusterResource); updateResource(clusterResource);
++numContainers; ++numContainers;
// Update user metrics // Update user metrics
String userName = application.getUser();
User user = getUser(userName); User user = getUser(userName);
user.assignContainer(resource); user.assignContainer(resource);
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() + LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers + " used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " resources=" + user.getConsumedResources()); " user=" + userName + " resources=" + user.getConsumedResources());
} }
synchronized void releaseResource(Resource clusterResource, synchronized void releaseResource(Resource clusterResource,
String userName, Resource resource) { SchedulerApp application, Resource resource) {
// Update queue metrics // Update queue metrics
Resources.subtractFrom(usedResources, resource); Resources.subtractFrom(usedResources, resource);
updateResource(clusterResource); updateResource(clusterResource);
--numContainers; --numContainers;
// Update user metrics // Update user metrics
String userName = application.getUser();
User user = getUser(userName); User user = getUser(userName);
user.releaseContainer(resource); user.releaseContainer(resource);
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() + LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers + " used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " resources=" + user.getConsumedResources()); " user=" + userName + " resources=" + user.getConsumedResources());
@ -1282,9 +1285,9 @@ public class LeafQueue implements CSQueue {
usedResources.getMemory() / (clusterResource.getMemory() * capacity)); usedResources.getMemory() / (clusterResource.getMemory() * capacity));
Resource resourceLimit = Resource resourceLimit =
Resources.createResource((int)queueLimit); Resources.createResource(roundUp((int)queueLimit));
metrics.setAvailableResourcesToQueue( metrics.setAvailableResourcesToQueue(
Resources.subtractFrom(resourceLimit, usedResources)); Resources.subtractFrom(resourceLimit, usedResources));
} }
@Override @Override
@ -1340,7 +1343,7 @@ public class LeafQueue implements CSQueue {
SchedulerApp application, Container container) { SchedulerApp application, Container container) {
// Careful! Locking order is important! // Careful! Locking order is important!
synchronized (this) { synchronized (this) {
allocateResource(clusterResource, application.getUser(), container.getResource()); allocateResource(clusterResource, application, container.getResource());
} }
parent.recoverContainer(clusterResource, application, container); parent.recoverContainer(clusterResource, application, container);

View File

@ -158,6 +158,52 @@ public class TestLeafQueue {
return queue; return queue;
} }
@Test
public void testSingleQueueOneUserMetrics() throws Exception {
// Manipulate queue 'a'
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(B));
// Users
final String user_0 = "user_0";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
SchedulerApp app_0 =
new SchedulerApp(appAttemptId_0, user_0, a, rmContext, null);
a.submitApplication(app_0, user_0, B);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
SchedulerApp app_1 =
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
a.submitApplication(app_1, user_0, B); // same user
// Setup some nodes
String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 1;
Resource clusterResource = Resources.createResource(numNodes * (8*GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
recordFactory)));
// Start testing...
// Only 1 container
a.assignContainers(clusterResource, node_0);
assertEquals(7, a.getMetrics().getAvailableGB());
}
@Test @Test
public void testSingleQueueWithOneUser() throws Exception { public void testSingleQueueWithOneUser() throws Exception {
@ -180,6 +226,7 @@ public class TestLeafQueue {
new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null); new SchedulerApp(appAttemptId_1, user_0, a, rmContext, null);
a.submitApplication(app_1, user_0, A); // same user a.submitApplication(app_1, user_0, A); // same user
// Setup some nodes // Setup some nodes
String host_0 = "host_0"; String host_0 = "host_0";
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
@ -207,6 +254,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB()); assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB()); assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(0, a.getMetrics().getAvailableGB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit // you can get one container more than user-limit
@ -273,6 +321,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB()); assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(0, a.getMetrics().getAllocatedGB()); assertEquals(0, a.getMetrics().getAllocatedGB());
assertEquals(1, a.getMetrics().getAvailableGB());
} }
@Test @Test
@ -494,6 +543,7 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
assertEquals(0, a.getMetrics().getReservedGB()); assertEquals(0, a.getMetrics().getReservedGB());
assertEquals(1, a.getMetrics().getAllocatedGB()); assertEquals(1, a.getMetrics().getAllocatedGB());
assertEquals(0, a.getMetrics().getAvailableGB());
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit // you can get one container more than user-limit

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.junit.After; import org.junit.After;
@ -81,6 +82,13 @@ public class TestParentQueue {
LOG.info("Setup top-level queues a and b"); LOG.info("Setup top-level queues a and b");
} }
private SchedulerApp getMockApplication(int appId, String user) {
SchedulerApp application = mock(SchedulerApp.class);
doReturn(user).when(application).getUser();
doReturn(null).when(application).getHeadroom();
return application;
}
private void stubQueueAllocation(final CSQueue queue, private void stubQueueAllocation(final CSQueue queue,
final Resource clusterResource, final SchedulerNode node, final Resource clusterResource, final SchedulerNode node,
final int allocation) { final int allocation) {
@ -100,7 +108,8 @@ public class TestParentQueue {
((ParentQueue)queue).allocateResource(clusterResource, ((ParentQueue)queue).allocateResource(clusterResource,
allocatedResource); allocatedResource);
} else { } else {
((LeafQueue)queue).allocateResource(clusterResource, "", SchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
allocatedResource); allocatedResource);
} }