Merge -c 1235858 from trunk to branch-0.23 to fix MAPREDUCE-3683. Fixed maxCapacity of queues to be product of parent maxCapacities.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1235860 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ac9e1a8103
commit
8507d72465
|
@ -513,6 +513,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a
|
MAPREDUCE-3630. Fixes a NullPointer exception while running TeraGen - if a
|
||||||
map is asked to generate 0 records. (Mahadev Konar via sseth)
|
map is asked to generate 0 records. (Mahadev Konar via sseth)
|
||||||
|
|
||||||
|
MAPREDUCE-3683. Fixed maxCapacity of queues to be product of parent
|
||||||
|
maxCapacities. (acmurthy)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -17,12 +17,19 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
class CSQueueUtils {
|
class CSQueueUtils {
|
||||||
|
|
||||||
public static void checkMaxCapacity(String queueName,
|
public static void checkMaxCapacity(String queueName,
|
||||||
float capacity, float maximumCapacity) {
|
float capacity, float maximumCapacity) {
|
||||||
if (Math.round(100 * maximumCapacity) != CapacitySchedulerConfiguration.UNDEFINED &&
|
if (maximumCapacity < 0.0f || maximumCapacity > 1.0f ||
|
||||||
maximumCapacity < capacity) {
|
maximumCapacity < capacity) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Illegal value of maximumCapacity " + maximumCapacity +
|
||||||
|
" used in call to setMaxCapacity for queue " + queueName);
|
||||||
|
}
|
||||||
|
if (maximumCapacity < capacity) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Illegal call to setMaxCapacity. " +
|
"Illegal call to setMaxCapacity. " +
|
||||||
"Queue '" + queueName + "' has " +
|
"Queue '" + queueName + "' has " +
|
||||||
|
@ -31,4 +38,25 @@ class CSQueueUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static float computeAbsoluteMaximumCapacity(
|
||||||
|
float maximumCapacity, CSQueue parent) {
|
||||||
|
float parentAbsMaxCapacity =
|
||||||
|
(parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity();
|
||||||
|
return (parentAbsMaxCapacity * maximumCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int computeMaxActiveApplications(Resource clusterResource,
|
||||||
|
float maxAMResourcePercent, float absoluteCapacity) {
|
||||||
|
return
|
||||||
|
Math.max(
|
||||||
|
(int)((clusterResource.getMemory() / (float)LeafQueue.DEFAULT_AM_RESOURCE) *
|
||||||
|
maxAMResourcePercent * absoluteCapacity),
|
||||||
|
1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int computeMaxActiveApplicationsPerUser(
|
||||||
|
int maxActiveApplications, int userLimit, float userLimitFactor) {
|
||||||
|
return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,7 +149,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
throw new IllegalArgumentException("Illegal " +
|
throw new IllegalArgumentException("Illegal " +
|
||||||
"capacity of " + capacity + " for queue " + queue);
|
"capacity of " + capacity + " for queue " + queue);
|
||||||
}
|
}
|
||||||
LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
||||||
", capacity=" + capacity);
|
", capacity=" + capacity);
|
||||||
return capacity;
|
return capacity;
|
||||||
}
|
}
|
||||||
|
@ -162,11 +162,15 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
|
|
||||||
public int getMaximumCapacity(String queue) {
|
public int getMaximumCapacity(String queue) {
|
||||||
int maxCapacity =
|
int maxCapacity =
|
||||||
getInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, UNDEFINED);
|
getInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, MAXIMUM_CAPACITY_VALUE);
|
||||||
return maxCapacity;
|
return maxCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaximumCapacity(String queue, int maxCapacity) {
|
public void setMaximumCapacity(String queue, int maxCapacity) {
|
||||||
|
if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
||||||
|
throw new IllegalArgumentException("Illegal " +
|
||||||
|
"maximum-capacity of " + maxCapacity + " for queue " + queue);
|
||||||
|
}
|
||||||
setInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
setInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
||||||
LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
||||||
", maxCapacity=" + maxCapacity);
|
", maxCapacity=" + maxCapacity);
|
||||||
|
|
|
@ -144,10 +144,10 @@ public class LeafQueue implements CSQueue {
|
||||||
(float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
|
(float)cs.getConfiguration().getCapacity(getQueuePath()) / 100;
|
||||||
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
||||||
|
|
||||||
float maximumCapacity = (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
|
float maximumCapacity =
|
||||||
|
(float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
|
||||||
float absoluteMaxCapacity =
|
float absoluteMaxCapacity =
|
||||||
(Math.round(maximumCapacity * 100) == CapacitySchedulerConfiguration.UNDEFINED) ?
|
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||||
Float.MAX_VALUE : (parent.getAbsoluteCapacity() * maximumCapacity);
|
|
||||||
|
|
||||||
int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
|
int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
|
||||||
float userLimitFactor =
|
float userLimitFactor =
|
||||||
|
@ -161,10 +161,10 @@ public class LeafQueue implements CSQueue {
|
||||||
this.maxAMResourcePercent =
|
this.maxAMResourcePercent =
|
||||||
cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
|
cs.getConfiguration().getMaximumApplicationMasterResourcePercent();
|
||||||
int maxActiveApplications =
|
int maxActiveApplications =
|
||||||
computeMaxActiveApplications(cs.getClusterResources(),
|
CSQueueUtils.computeMaxActiveApplications(cs.getClusterResources(),
|
||||||
maxAMResourcePercent, absoluteCapacity);
|
maxAMResourcePercent, absoluteCapacity);
|
||||||
int maxActiveApplicationsPerUser =
|
int maxActiveApplicationsPerUser =
|
||||||
computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
|
CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
|
||||||
userLimitFactor);
|
userLimitFactor);
|
||||||
|
|
||||||
this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||||
|
@ -193,20 +193,6 @@ public class LeafQueue implements CSQueue {
|
||||||
this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
|
this.activeApplications = new TreeSet<SchedulerApp>(applicationComparator);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int computeMaxActiveApplications(Resource clusterResource,
|
|
||||||
float maxAMResourcePercent, float absoluteCapacity) {
|
|
||||||
return
|
|
||||||
Math.max(
|
|
||||||
(int)((clusterResource.getMemory() / (float)DEFAULT_AM_RESOURCE) *
|
|
||||||
maxAMResourcePercent * absoluteCapacity),
|
|
||||||
1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private int computeMaxActiveApplicationsPerUser(int maxActiveApplications,
|
|
||||||
int userLimit, float userLimitFactor) {
|
|
||||||
return (int)(maxActiveApplications * (userLimit / 100.0f) * userLimitFactor);
|
|
||||||
}
|
|
||||||
|
|
||||||
private synchronized void setupQueueConfigs(
|
private synchronized void setupQueueConfigs(
|
||||||
float capacity, float absoluteCapacity,
|
float capacity, float absoluteCapacity,
|
||||||
float maximumCapacity, float absoluteMaxCapacity,
|
float maximumCapacity, float absoluteMaxCapacity,
|
||||||
|
@ -254,8 +240,8 @@ public class LeafQueue implements CSQueue {
|
||||||
"maxCapacity = " + maximumCapacity +
|
"maxCapacity = " + maximumCapacity +
|
||||||
" [= configuredMaxCapacity ]" + "\n" +
|
" [= configuredMaxCapacity ]" + "\n" +
|
||||||
"absoluteMaxCapacity = " + absoluteMaxCapacity +
|
"absoluteMaxCapacity = " + absoluteMaxCapacity +
|
||||||
" [= Float.MAX_VALUE if maximumCapacity undefined, " +
|
" [= 1.0 maximumCapacity undefined, " +
|
||||||
"(parentAbsoluteCapacity * maximumCapacity) / 100 otherwise ]" + "\n" +
|
"(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + "\n" +
|
||||||
"userLimit = " + userLimit +
|
"userLimit = " + userLimit +
|
||||||
" [= configuredUserLimit ]" + "\n" +
|
" [= configuredUserLimit ]" + "\n" +
|
||||||
"userLimitFactor = " + userLimitFactor +
|
"userLimitFactor = " + userLimitFactor +
|
||||||
|
@ -400,9 +386,7 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
this.maximumCapacity = maximumCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
this.absoluteMaxCapacity =
|
this.absoluteMaxCapacity =
|
||||||
(Math.round(maximumCapacity * 100) == CapacitySchedulerConfiguration.UNDEFINED) ?
|
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||||
Float.MAX_VALUE :
|
|
||||||
(parent.getAbsoluteCapacity() * maximumCapacity);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -835,13 +819,14 @@ public class LeafQueue implements CSQueue {
|
||||||
float potentialNewCapacity =
|
float potentialNewCapacity =
|
||||||
(float)(usedResources.getMemory() + required.getMemory()) /
|
(float)(usedResources.getMemory() + required.getMemory()) /
|
||||||
clusterResource.getMemory();
|
clusterResource.getMemory();
|
||||||
|
if (potentialNewCapacity > absoluteMaxCapacity) {
|
||||||
LOG.info(getQueueName() +
|
LOG.info(getQueueName() +
|
||||||
" usedResources: " + usedResources.getMemory() +
|
" usedResources: " + usedResources.getMemory() +
|
||||||
|
" clusterResources: " + clusterResource.getMemory() +
|
||||||
" currentCapacity " + ((float)usedResources.getMemory())/clusterResource.getMemory() +
|
" currentCapacity " + ((float)usedResources.getMemory())/clusterResource.getMemory() +
|
||||||
" required " + required.getMemory() +
|
" required " + required.getMemory() +
|
||||||
" potentialNewCapacity: " + potentialNewCapacity + " ( " +
|
" potentialNewCapacity: " + potentialNewCapacity + " ( " +
|
||||||
" max-capacity: " + absoluteMaxCapacity + ")");
|
" max-capacity: " + absoluteMaxCapacity + ")");
|
||||||
if (potentialNewCapacity > absoluteMaxCapacity) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -1308,10 +1293,10 @@ public class LeafQueue implements CSQueue {
|
||||||
public synchronized void updateClusterResource(Resource clusterResource) {
|
public synchronized void updateClusterResource(Resource clusterResource) {
|
||||||
// Update queue properties
|
// Update queue properties
|
||||||
maxActiveApplications =
|
maxActiveApplications =
|
||||||
computeMaxActiveApplications(clusterResource, maxAMResourcePercent,
|
CSQueueUtils.computeMaxActiveApplications(clusterResource, maxAMResourcePercent,
|
||||||
absoluteCapacity);
|
absoluteCapacity);
|
||||||
maxActiveApplicationsPerUser =
|
maxActiveApplicationsPerUser =
|
||||||
computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
|
CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
|
||||||
userLimitFactor);
|
userLimitFactor);
|
||||||
|
|
||||||
// Update application properties
|
// Update application properties
|
||||||
|
|
|
@ -118,16 +118,14 @@ public class ParentQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
float capacity = (float) rawCapacity / 100;
|
float capacity = (float) rawCapacity / 100;
|
||||||
|
|
||||||
float parentAbsoluteCapacity =
|
float parentAbsoluteCapacity =
|
||||||
(parent == null) ? 1.0f : parent.getAbsoluteCapacity();
|
(rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
|
||||||
float absoluteCapacity = parentAbsoluteCapacity * capacity;
|
float absoluteCapacity = parentAbsoluteCapacity * capacity;
|
||||||
|
|
||||||
float maximumCapacity =
|
float maximumCapacity =
|
||||||
(float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
|
(float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
|
||||||
float absoluteMaxCapacity =
|
float absoluteMaxCapacity =
|
||||||
(Math.round(maximumCapacity * 100) == CapacitySchedulerConfiguration.UNDEFINED) ?
|
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||||
Float.MAX_VALUE : (parentAbsoluteCapacity * maximumCapacity);
|
|
||||||
|
|
||||||
QueueState state = cs.getConfiguration().getState(getQueuePath());
|
QueueState state = cs.getConfiguration().getState(getQueuePath());
|
||||||
|
|
||||||
|
@ -497,12 +495,8 @@ public class ParentQueue implements CSQueue {
|
||||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
|
||||||
this.maximumCapacity = maximumCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
float parentAbsoluteCapacity =
|
|
||||||
(rootQueue) ? 100.0f : parent.getAbsoluteCapacity();
|
|
||||||
this.absoluteMaxCapacity =
|
this.absoluteMaxCapacity =
|
||||||
(maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
|
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||||
Float.MAX_VALUE :
|
|
||||||
(parentAbsoluteCapacity * maximumCapacity);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -255,7 +255,7 @@ public class TestLeafQueue {
|
||||||
// Manipulate queue 'a'
|
// Manipulate queue 'a'
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
//unset maxCapacity
|
//unset maxCapacity
|
||||||
a.setMaxCapacity(-0.01f);
|
a.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
// Users
|
// Users
|
||||||
final String user_0 = "user_0";
|
final String user_0 = "user_0";
|
||||||
|
@ -377,7 +377,7 @@ public class TestLeafQueue {
|
||||||
// Mock the queue
|
// Mock the queue
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
//unset maxCapacity
|
//unset maxCapacity
|
||||||
a.setMaxCapacity(-0.01f);
|
a.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
// Users
|
// Users
|
||||||
final String user_0 = "user_0";
|
final String user_0 = "user_0";
|
||||||
|
@ -491,7 +491,7 @@ public class TestLeafQueue {
|
||||||
|
|
||||||
// Revert max-capacity and user-limit-factor
|
// Revert max-capacity and user-limit-factor
|
||||||
// Now, allocations should goto app_3 since it's under user-limit
|
// Now, allocations should goto app_3 since it's under user-limit
|
||||||
a.setMaxCapacity(-0.01f);
|
a.setMaxCapacity(1.0f);
|
||||||
a.setUserLimitFactor(1);
|
a.setUserLimitFactor(1);
|
||||||
a.assignContainers(clusterResource, node_0);
|
a.assignContainers(clusterResource, node_0);
|
||||||
assertEquals(7*GB, a.getUsedResources().getMemory());
|
assertEquals(7*GB, a.getUsedResources().getMemory());
|
||||||
|
@ -548,7 +548,7 @@ public class TestLeafQueue {
|
||||||
// Manipulate queue 'a'
|
// Manipulate queue 'a'
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
//unset maxCapacity
|
//unset maxCapacity
|
||||||
a.setMaxCapacity(-0.01f);
|
a.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
// Users
|
// Users
|
||||||
final String user_0 = "user_0";
|
final String user_0 = "user_0";
|
||||||
|
@ -571,7 +571,7 @@ public class TestLeafQueue {
|
||||||
String host_0 = "host_0";
|
String host_0 = "host_0";
|
||||||
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB);
|
||||||
|
|
||||||
final int numNodes = 1;
|
final int numNodes = 2;
|
||||||
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
|
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
|
||||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
@ -646,7 +646,7 @@ public class TestLeafQueue {
|
||||||
// Manipulate queue 'a'
|
// Manipulate queue 'a'
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
//unset maxCapacity
|
//unset maxCapacity
|
||||||
a.setMaxCapacity(-0.01f);
|
a.setMaxCapacity(1.0f);
|
||||||
a.setUserLimitFactor(10);
|
a.setUserLimitFactor(10);
|
||||||
|
|
||||||
// Users
|
// Users
|
||||||
|
@ -673,7 +673,7 @@ public class TestLeafQueue {
|
||||||
String host_1 = "host_1";
|
String host_1 = "host_1";
|
||||||
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
|
SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB);
|
||||||
|
|
||||||
final int numNodes = 2;
|
final int numNodes = 3;
|
||||||
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
|
Resource clusterResource = Resources.createResource(numNodes * (4*GB));
|
||||||
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
when(csContext.getMaximumResourceCapability()).thenReturn(
|
when(csContext.getMaximumResourceCapability()).thenReturn(
|
||||||
|
|
|
@ -30,6 +30,8 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestQueueParsing.class);
|
private static final Log LOG = LogFactory.getLog(TestQueueParsing.class);
|
||||||
|
|
||||||
|
private static final double DELTA = 0.000001;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueParsing() throws Exception {
|
public void testQueueParsing() throws Exception {
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
@ -37,6 +39,20 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
capacityScheduler.reinitialize(conf, null, null);
|
capacityScheduler.reinitialize(conf, null, null);
|
||||||
|
|
||||||
|
CSQueue a = capacityScheduler.getQueue("a");
|
||||||
|
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue b1 = capacityScheduler.getQueue("b1");
|
||||||
|
Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals("Parent B has no MAX_CAP",
|
||||||
|
0.85, b1.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
|
||||||
|
CSQueue c12 = capacityScheduler.getQueue("c12");
|
||||||
|
Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
|
||||||
|
Assert.assertEquals(0.7 * 0.55 * 0.7,
|
||||||
|
c12.getAbsoluteMaximumCapacity(), DELTA);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
@ -47,12 +63,14 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
conf.setCapacity(A, 10);
|
conf.setCapacity(A, 10);
|
||||||
|
conf.setMaximumCapacity(A, 15);
|
||||||
|
|
||||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
conf.setCapacity(B, 20);
|
conf.setCapacity(B, 20);
|
||||||
|
|
||||||
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||||
conf.setCapacity(C, 70);
|
conf.setCapacity(C, 70);
|
||||||
|
conf.setMaximumCapacity(C, 70);
|
||||||
|
|
||||||
LOG.info("Setup top-level queues");
|
LOG.info("Setup top-level queues");
|
||||||
|
|
||||||
|
@ -61,15 +79,20 @@ public class TestQueueParsing {
|
||||||
final String A2 = A + ".a2";
|
final String A2 = A + ".a2";
|
||||||
conf.setQueues(A, new String[] {"a1", "a2"});
|
conf.setQueues(A, new String[] {"a1", "a2"});
|
||||||
conf.setCapacity(A1, 30);
|
conf.setCapacity(A1, 30);
|
||||||
|
conf.setMaximumCapacity(A1, 45);
|
||||||
conf.setCapacity(A2, 70);
|
conf.setCapacity(A2, 70);
|
||||||
|
conf.setMaximumCapacity(A2, 85);
|
||||||
|
|
||||||
final String B1 = B + ".b1";
|
final String B1 = B + ".b1";
|
||||||
final String B2 = B + ".b2";
|
final String B2 = B + ".b2";
|
||||||
final String B3 = B + ".b3";
|
final String B3 = B + ".b3";
|
||||||
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
|
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
|
||||||
conf.setCapacity(B1, 50);
|
conf.setCapacity(B1, 50);
|
||||||
|
conf.setMaximumCapacity(B1, 85);
|
||||||
conf.setCapacity(B2, 30);
|
conf.setCapacity(B2, 30);
|
||||||
|
conf.setMaximumCapacity(B2, 35);
|
||||||
conf.setCapacity(B3, 20);
|
conf.setCapacity(B3, 20);
|
||||||
|
conf.setMaximumCapacity(B3, 35);
|
||||||
|
|
||||||
final String C1 = C + ".c1";
|
final String C1 = C + ".c1";
|
||||||
final String C2 = C + ".c2";
|
final String C2 = C + ".c2";
|
||||||
|
@ -77,9 +100,13 @@ public class TestQueueParsing {
|
||||||
final String C4 = C + ".c4";
|
final String C4 = C + ".c4";
|
||||||
conf.setQueues(C, new String[] {"c1", "c2", "c3", "c4"});
|
conf.setQueues(C, new String[] {"c1", "c2", "c3", "c4"});
|
||||||
conf.setCapacity(C1, 50);
|
conf.setCapacity(C1, 50);
|
||||||
|
conf.setMaximumCapacity(C1, 55);
|
||||||
conf.setCapacity(C2, 10);
|
conf.setCapacity(C2, 10);
|
||||||
|
conf.setMaximumCapacity(C2, 25);
|
||||||
conf.setCapacity(C3, 35);
|
conf.setCapacity(C3, 35);
|
||||||
|
conf.setMaximumCapacity(C3, 38);
|
||||||
conf.setCapacity(C4, 5);
|
conf.setCapacity(C4, 5);
|
||||||
|
conf.setMaximumCapacity(C4, 5);
|
||||||
|
|
||||||
LOG.info("Setup 2nd-level queues");
|
LOG.info("Setup 2nd-level queues");
|
||||||
|
|
||||||
|
@ -89,8 +116,11 @@ public class TestQueueParsing {
|
||||||
final String C13 = C1 + ".c13";
|
final String C13 = C1 + ".c13";
|
||||||
conf.setQueues(C1, new String[] {"c11", "c12", "c13"});
|
conf.setQueues(C1, new String[] {"c11", "c12", "c13"});
|
||||||
conf.setCapacity(C11, 15);
|
conf.setCapacity(C11, 15);
|
||||||
|
conf.setMaximumCapacity(C11, 30);
|
||||||
conf.setCapacity(C12, 45);
|
conf.setCapacity(C12, 45);
|
||||||
|
conf.setMaximumCapacity(C12, 70);
|
||||||
conf.setCapacity(C13, 40);
|
conf.setCapacity(C13, 40);
|
||||||
|
conf.setMaximumCapacity(C13, 40);
|
||||||
|
|
||||||
LOG.info("Setup 3rd-level queues");
|
LOG.info("Setup 3rd-level queues");
|
||||||
}
|
}
|
||||||
|
|
|
@ -235,12 +235,13 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
Element qElem = (Element) queues.item(j);
|
Element qElem = (Element) queues.item(j);
|
||||||
String qName = WebServicesTestUtils.getXmlString(qElem, "queueName");
|
String qName = WebServicesTestUtils.getXmlString(qElem, "queueName");
|
||||||
String q = CapacitySchedulerConfiguration.ROOT + "." + qName;
|
String q = CapacitySchedulerConfiguration.ROOT + "." + qName;
|
||||||
verifySubQueueXML(qElem, q, 100);
|
verifySubQueueXML(qElem, q, 100, 100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifySubQueueXML(Element qElem, String q, float parentAbsCapacity)
|
public void verifySubQueueXML(Element qElem, String q,
|
||||||
|
float parentAbsCapacity, float parentAbsMaxCapacity)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
NodeList queues = qElem.getElementsByTagName("subQueues");
|
NodeList queues = qElem.getElementsByTagName("subQueues");
|
||||||
QueueInfo qi = (queues != null) ? new QueueInfo() : new LeafQueueInfo();
|
QueueInfo qi = (queues != null) ? new QueueInfo() : new LeafQueueInfo();
|
||||||
|
@ -258,14 +259,15 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
WebServicesTestUtils.getXmlString(qElem, "usedResources");
|
WebServicesTestUtils.getXmlString(qElem, "usedResources");
|
||||||
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
|
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
|
||||||
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
|
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
|
||||||
verifySubQueueGeneric(q, qi, parentAbsCapacity);
|
verifySubQueueGeneric(q, qi, parentAbsCapacity, parentAbsMaxCapacity);
|
||||||
|
|
||||||
if (queues != null) {
|
if (queues != null) {
|
||||||
for (int j = 0; j < queues.getLength(); j++) {
|
for (int j = 0; j < queues.getLength(); j++) {
|
||||||
Element subqElem = (Element) queues.item(j);
|
Element subqElem = (Element) queues.item(j);
|
||||||
String qName = WebServicesTestUtils.getXmlString(subqElem, "queueName");
|
String qName = WebServicesTestUtils.getXmlString(subqElem, "queueName");
|
||||||
String q2 = q + "." + qName;
|
String q2 = q + "." + qName;
|
||||||
verifySubQueueXML(subqElem, q2, qi.absoluteCapacity);
|
verifySubQueueXML(subqElem, q2,
|
||||||
|
qi.absoluteCapacity, qi.absoluteMaxCapacity);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
||||||
|
@ -309,7 +311,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
for (int i = 0; i < arr.length(); i++) {
|
for (int i = 0; i < arr.length(); i++) {
|
||||||
JSONObject obj = arr.getJSONObject(i);
|
JSONObject obj = arr.getJSONObject(i);
|
||||||
String q = CapacitySchedulerConfiguration.ROOT + "." + obj.getString("queueName");
|
String q = CapacitySchedulerConfiguration.ROOT + "." + obj.getString("queueName");
|
||||||
verifySubQueue(obj, q, 100);
|
verifySubQueue(obj, q, 100, 100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +325,8 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
assertTrue("queueName doesn't match", "root".matches(queueName));
|
assertTrue("queueName doesn't match", "root".matches(queueName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifySubQueue(JSONObject info, String q, float parentAbsCapacity)
|
private void verifySubQueue(JSONObject info, String q,
|
||||||
|
float parentAbsCapacity, float parentAbsMaxCapacity)
|
||||||
throws JSONException, Exception {
|
throws JSONException, Exception {
|
||||||
int numExpectedElements = 11;
|
int numExpectedElements = 11;
|
||||||
boolean isParentQueue = true;
|
boolean isParentQueue = true;
|
||||||
|
@ -345,7 +348,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
qi.queueName = info.getString("queueName");
|
qi.queueName = info.getString("queueName");
|
||||||
qi.state = info.getString("state");
|
qi.state = info.getString("state");
|
||||||
|
|
||||||
verifySubQueueGeneric(q, qi, parentAbsCapacity);
|
verifySubQueueGeneric(q, qi, parentAbsCapacity, parentAbsMaxCapacity);
|
||||||
|
|
||||||
if (isParentQueue) {
|
if (isParentQueue) {
|
||||||
JSONArray arr = info.getJSONArray("subQueues");
|
JSONArray arr = info.getJSONArray("subQueues");
|
||||||
|
@ -353,7 +356,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
for (int i = 0; i < arr.length(); i++) {
|
for (int i = 0; i < arr.length(); i++) {
|
||||||
JSONObject obj = arr.getJSONObject(i);
|
JSONObject obj = arr.getJSONObject(i);
|
||||||
String q2 = q + "." + obj.getString("queueName");
|
String q2 = q + "." + obj.getString("queueName");
|
||||||
verifySubQueue(obj, q2, qi.absoluteCapacity);
|
verifySubQueue(obj, q2, qi.absoluteCapacity, qi.absoluteMaxCapacity);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
||||||
|
@ -371,7 +374,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifySubQueueGeneric(String q, QueueInfo info,
|
private void verifySubQueueGeneric(String q, QueueInfo info,
|
||||||
float parentAbsCapacity) throws Exception {
|
float parentAbsCapacity, float parentAbsMaxCapacity) throws Exception {
|
||||||
String[] qArr = q.split("\\.");
|
String[] qArr = q.split("\\.");
|
||||||
assertTrue("q name invalid: " + q, qArr.length > 1);
|
assertTrue("q name invalid: " + q, qArr.length > 1);
|
||||||
String qshortName = qArr[qArr.length - 1];
|
String qshortName = qArr[qArr.length - 1];
|
||||||
|
@ -380,7 +383,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
|
||||||
assertEquals("capacity doesn't match", csConf.getCapacity(q),
|
assertEquals("capacity doesn't match", csConf.getCapacity(q),
|
||||||
info.capacity, 1e-3f);
|
info.capacity, 1e-3f);
|
||||||
float expectCapacity = csConf.getMaximumCapacity(q);
|
float expectCapacity = csConf.getMaximumCapacity(q);
|
||||||
float expectAbsMaxCapacity = parentAbsCapacity * (info.maxCapacity/100);
|
float expectAbsMaxCapacity = parentAbsMaxCapacity * (info.maxCapacity/100);
|
||||||
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
|
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
|
||||||
expectCapacity = 100;
|
expectCapacity = 100;
|
||||||
expectAbsMaxCapacity = 100;
|
expectAbsMaxCapacity = 100;
|
||||||
|
|
Loading…
Reference in New Issue