Merge -c 1235103 from trunk to branch-0.23 to fix MAPREDUCE-3681. Fixed computation of queue's usedCapacity.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1235104 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d6ebd72325
commit
439446057d
|
@ -493,6 +493,8 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3646. Remove redundant URL info from "mapred job" output.
|
MAPREDUCE-3646. Remove redundant URL info from "mapred job" output.
|
||||||
(Jonathan Eagles via mahadev)
|
(Jonathan Eagles via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3681. Fixed computation of queue's usedCapacity. (acmurthy)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -272,9 +272,9 @@ public class LeafQueue implements CSQueue {
|
||||||
"maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
|
"maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
|
||||||
" [= (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" +
|
" [= (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor) ]" + "\n" +
|
||||||
"utilization = " + utilization +
|
"utilization = " + utilization +
|
||||||
" [= usedResourcesMemory / queueLimit ]" + "\n" +
|
" [= usedResourcesMemory / (clusterResourceMemory * absoluteCapacity)]" + "\n" +
|
||||||
"usedCapacity = " + usedCapacity +
|
"usedCapacity = " + usedCapacity +
|
||||||
" [= usedResourcesMemory / (clusterResourceMemory * capacity) ]" + "\n" +
|
" [= usedResourcesMemory / (clusterResourceMemory * parent.absoluteCapacity)]" + "\n" +
|
||||||
"maxAMResourcePercent = " + maxAMResourcePercent +
|
"maxAMResourcePercent = " + maxAMResourcePercent +
|
||||||
" [= configuredMaximumAMResourcePercent ]" + "\n" +
|
" [= configuredMaximumAMResourcePercent ]" + "\n" +
|
||||||
"minimumAllocationFactor = " + minimumAllocationFactor +
|
"minimumAllocationFactor = " + minimumAllocationFactor +
|
||||||
|
@ -502,9 +502,14 @@ public class LeafQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return queueName + ":" + capacity + ":" + absoluteCapacity + ":" +
|
return queueName + ": " +
|
||||||
getUsedCapacity() + ":" + getUtilization() + ":" +
|
"capacity=" + capacity + ", " +
|
||||||
getNumApplications() + ":" + getNumContainers();
|
"absoluteCapacity=" + absoluteCapacity + ", " +
|
||||||
|
"usedResources=" + usedResources.getMemory() + "MB, " +
|
||||||
|
"usedCapacity=" + getUsedCapacity() + ", " +
|
||||||
|
"utilization=" + getUtilization() + ", " +
|
||||||
|
"numApps=" + getNumApplications() + ", " +
|
||||||
|
"numContainers=" + getNumContainers();
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized User getUser(String userName) {
|
private synchronized User getUser(String userName) {
|
||||||
|
@ -1318,8 +1323,8 @@ public class LeafQueue implements CSQueue {
|
||||||
private synchronized void updateResource(Resource clusterResource) {
|
private synchronized void updateResource(Resource clusterResource) {
|
||||||
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
|
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
|
||||||
setUtilization(usedResources.getMemory() / queueLimit);
|
setUtilization(usedResources.getMemory() / queueLimit);
|
||||||
setUsedCapacity(
|
setUsedCapacity(usedResources.getMemory()
|
||||||
usedResources.getMemory() / (clusterResource.getMemory() * capacity));
|
/ (clusterResource.getMemory() * parent.getAbsoluteCapacity()));
|
||||||
|
|
||||||
Resource resourceLimit =
|
Resource resourceLimit =
|
||||||
Resources.createResource(roundUp((int)queueLimit));
|
Resources.createResource(roundUp((int)queueLimit));
|
||||||
|
|
|
@ -333,10 +333,15 @@ public class ParentQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return queueName + ":" + capacity + ":" + absoluteCapacity + ":" +
|
return queueName + ": " +
|
||||||
getUsedCapacity() + ":" + getUtilization() + ":" +
|
"numChildQueue= " + childQueues.size() + ", " +
|
||||||
getNumApplications() + ":" + getNumContainers() + ":" +
|
"capacity=" + capacity + ", " +
|
||||||
childQueues.size() + " child-queues";
|
"absoluteCapacity=" + absoluteCapacity + ", " +
|
||||||
|
"usedResources=" + usedResources.getMemory() + "MB, " +
|
||||||
|
"usedCapacity=" + getUsedCapacity() + ", " +
|
||||||
|
"utilization=" + getUtilization() + ", " +
|
||||||
|
"numApps=" + getNumApplications() + ", " +
|
||||||
|
"numContainers=" + getNumContainers();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -689,9 +694,11 @@ public class ParentQueue implements CSQueue {
|
||||||
|
|
||||||
private synchronized void updateResource(Resource clusterResource) {
|
private synchronized void updateResource(Resource clusterResource) {
|
||||||
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
|
float queueLimit = clusterResource.getMemory() * absoluteCapacity;
|
||||||
|
float parentAbsoluteCapacity =
|
||||||
|
(rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
|
||||||
setUtilization(usedResources.getMemory() / queueLimit);
|
setUtilization(usedResources.getMemory() / queueLimit);
|
||||||
setUsedCapacity(
|
setUsedCapacity(usedResources.getMemory()
|
||||||
usedResources.getMemory() / (clusterResource.getMemory() * capacity));
|
/ (clusterResource.getMemory() * parentAbsoluteCapacity));
|
||||||
|
|
||||||
Resource resourceLimit =
|
Resource resourceLimit =
|
||||||
Resources.createResource((int)queueLimit);
|
Resources.createResource((int)queueLimit);
|
||||||
|
|
|
@ -138,12 +138,34 @@ public class TestParentQueue {
|
||||||
when(queue).assignContainers(eq(clusterResource), eq(node));
|
when(queue).assignContainers(eq(clusterResource), eq(node));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private float computeQueueUsedCapacity(CSQueue queue,
|
||||||
|
int expectedMemory, Resource clusterResource) {
|
||||||
|
return (
|
||||||
|
((float)expectedMemory / clusterResource.getMemory()) *
|
||||||
|
queue.getParent().getAbsoluteCapacity()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private float computeQueueUtilization(CSQueue queue,
|
private float computeQueueUtilization(CSQueue queue,
|
||||||
int expectedMemory, Resource clusterResource) {
|
int expectedMemory, Resource clusterResource) {
|
||||||
return (expectedMemory /
|
return (expectedMemory /
|
||||||
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
|
(clusterResource.getMemory() * queue.getAbsoluteCapacity()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final static float DELTA = 0.0001f;
|
||||||
|
private void verifyQueueMetrics(CSQueue queue,
|
||||||
|
int expectedMemory, Resource clusterResource) {
|
||||||
|
assertEquals(
|
||||||
|
computeQueueUtilization(queue, expectedMemory, clusterResource),
|
||||||
|
queue.getUtilization(),
|
||||||
|
DELTA);
|
||||||
|
assertEquals(
|
||||||
|
computeQueueUsedCapacity(queue, expectedMemory, clusterResource),
|
||||||
|
queue.getUsedCapacity(),
|
||||||
|
DELTA);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleLevelQueues() throws Exception {
|
public void testSingleLevelQueues() throws Exception {
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
|
@ -173,15 +195,13 @@ public class TestParentQueue {
|
||||||
// Start testing
|
// Start testing
|
||||||
LeafQueue a = (LeafQueue)queues.get(A);
|
LeafQueue a = (LeafQueue)queues.get(A);
|
||||||
LeafQueue b = (LeafQueue)queues.get(B);
|
LeafQueue b = (LeafQueue)queues.get(B);
|
||||||
final float delta = 0.0001f;
|
|
||||||
|
|
||||||
// Simulate B returning a container on node_0
|
// Simulate B returning a container on node_0
|
||||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
|
||||||
root.assignContainers(clusterResource, node_0);
|
root.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(0.0f, a.getUtilization(), delta);
|
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 1*GB, clusterResource),
|
verifyQueueMetrics(b, 1*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
|
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
|
||||||
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
|
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
|
||||||
|
@ -192,10 +212,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
|
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 2*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, B should get the scheduling opportunity
|
// Now, B should get the scheduling opportunity
|
||||||
// since A has 2/6G while B has 2/14G
|
// since A has 2/6G while B has 2/14G
|
||||||
|
@ -207,10 +225,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
|
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, B should still get the scheduling opportunity
|
// Now, B should still get the scheduling opportunity
|
||||||
// since A has 3/6G while B has 4/14G
|
// since A has 3/6G while B has 4/14G
|
||||||
|
@ -222,10 +238,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
|
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 8*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, A should get the scheduling opportunity
|
// Now, A should get the scheduling opportunity
|
||||||
// since A has 3/6G while B has 8/14G
|
// since A has 3/6G while B has 8/14G
|
||||||
|
@ -237,10 +251,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 4*GB, clusterResource),
|
verifyQueueMetrics(a, 4*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 9*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 9*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String C = "c";
|
private static final String C = "c";
|
||||||
|
@ -323,22 +335,16 @@ public class TestParentQueue {
|
||||||
CSQueue b2 = queues.get(B2);
|
CSQueue b2 = queues.get(B2);
|
||||||
CSQueue b3 = queues.get(B3);
|
CSQueue b3 = queues.get(B3);
|
||||||
|
|
||||||
final float delta = 0.0001f;
|
|
||||||
|
|
||||||
// Simulate C returning a container on node_0
|
// Simulate C returning a container on node_0
|
||||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
|
||||||
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
|
||||||
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
|
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
|
||||||
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
|
||||||
root.assignContainers(clusterResource, node_0);
|
root.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
|
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 0*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 0*GB, clusterResource),
|
verifyQueueMetrics(c, 1*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
verifyQueueMetrics(d, 0*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
|
|
||||||
c.getUtilization(), delta);
|
|
||||||
assertEquals(computeQueueUtilization(d, 0*GB, clusterResource),
|
|
||||||
d.getUtilization(), delta);
|
|
||||||
reset(a); reset(b); reset(c);
|
reset(a); reset(b); reset(c);
|
||||||
|
|
||||||
// Now get B2 to allocate
|
// Now get B2 to allocate
|
||||||
|
@ -347,12 +353,9 @@ public class TestParentQueue {
|
||||||
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
|
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
|
||||||
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
|
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
|
||||||
root.assignContainers(clusterResource, node_1);
|
root.assignContainers(clusterResource, node_1);
|
||||||
assertEquals(computeQueueUtilization(a, 0*GB, clusterResource),
|
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
|
verifyQueueMetrics(c, 1*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
|
||||||
assertEquals(computeQueueUtilization(c, 1*GB, clusterResource),
|
|
||||||
c.getUtilization(), delta);
|
|
||||||
reset(a); reset(b); reset(c);
|
reset(a); reset(b); reset(c);
|
||||||
|
|
||||||
// Now get both A1, C & B3 to allocate in right order
|
// Now get both A1, C & B3 to allocate in right order
|
||||||
|
@ -368,12 +371,9 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 1*GB, clusterResource),
|
verifyQueueMetrics(a, 1*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 6*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 6*GB, clusterResource),
|
verifyQueueMetrics(c, 3*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
|
||||||
assertEquals(computeQueueUtilization(c, 3*GB, clusterResource),
|
|
||||||
c.getUtilization(), delta);
|
|
||||||
reset(a); reset(b); reset(c);
|
reset(a); reset(b); reset(c);
|
||||||
|
|
||||||
// Now verify max-capacity
|
// Now verify max-capacity
|
||||||
|
@ -399,14 +399,10 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(c).assignContainers(eq(clusterResource),
|
allocationOrder.verify(c).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 3*GB, clusterResource),
|
verifyQueueMetrics(a, 3*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 8*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 8*GB, clusterResource),
|
verifyQueueMetrics(c, 4*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
|
||||||
assertEquals(computeQueueUtilization(c, 4*GB, clusterResource),
|
|
||||||
c.getUtilization(), delta);
|
|
||||||
reset(a); reset(b); reset(c);
|
reset(a); reset(b); reset(c);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -438,15 +434,13 @@ public class TestParentQueue {
|
||||||
// Start testing
|
// Start testing
|
||||||
LeafQueue a = (LeafQueue)queues.get(A);
|
LeafQueue a = (LeafQueue)queues.get(A);
|
||||||
LeafQueue b = (LeafQueue)queues.get(B);
|
LeafQueue b = (LeafQueue)queues.get(B);
|
||||||
final float delta = 0.0001f;
|
|
||||||
|
|
||||||
// Simulate B returning a container on node_0
|
// Simulate B returning a container on node_0
|
||||||
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
|
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
|
||||||
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
|
||||||
root.assignContainers(clusterResource, node_0);
|
root.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(0.0f, a.getUtilization(), delta);
|
verifyQueueMetrics(a, 0*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 1*GB, clusterResource),
|
verifyQueueMetrics(b, 1*GB, clusterResource);
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
|
// Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
|
||||||
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
|
// also, B gets a scheduling opportunity since A allocates RACK_LOCAL
|
||||||
|
@ -458,10 +452,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
allocationOrder.verify(b).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
|
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 2*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 2*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
// Now, B should get the scheduling opportunity
|
// Now, B should get the scheduling opportunity
|
||||||
// since A has 2/6G while B has 2/14G,
|
// since A has 2/6G while B has 2/14G,
|
||||||
|
@ -474,10 +466,8 @@ public class TestParentQueue {
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
allocationOrder.verify(a).assignContainers(eq(clusterResource),
|
||||||
any(SchedulerNode.class));
|
any(SchedulerNode.class));
|
||||||
assertEquals(computeQueueUtilization(a, 2*GB, clusterResource),
|
verifyQueueMetrics(a, 2*GB, clusterResource);
|
||||||
a.getUtilization(), delta);
|
verifyQueueMetrics(b, 4*GB, clusterResource);
|
||||||
assertEquals(computeQueueUtilization(b, 4*GB, clusterResource),
|
|
||||||
b.getUtilization(), delta);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue