Merge -c 1205260 from trunk to branch-0.23 to fix MAPREDUCE-3329.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1205261 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fbfe1b0afa
commit
7b2940190d
|
@ -121,6 +121,9 @@ Release 0.23.1 - Unreleased
|
||||||
MAPREDUCE-3408. yarn-daemon.sh unconditionnaly sets yarn.root.logger
|
MAPREDUCE-3408. yarn-daemon.sh unconditionnaly sets yarn.root.logger
|
||||||
(Bruno Mahe via mahadev)
|
(Bruno Mahe via mahadev)
|
||||||
|
|
||||||
|
MAPREDUCE-3329. Fixed CapacityScheduler to ensure maximum-capacity cannot
|
||||||
|
be lesser than capacity for any queue. (acmurthy)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -0,0 +1,17 @@
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
class CSQueueUtils {
|
||||||
|
|
||||||
|
public static void checkMaxCapacity(String queueName,
|
||||||
|
float capacity, float maximumCapacity) {
|
||||||
|
if (maximumCapacity != CapacitySchedulerConfiguration.UNDEFINED &&
|
||||||
|
maximumCapacity < capacity) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Illegal call to setMaxCapacity. " +
|
||||||
|
"Queue '" + queueName + "' has " +
|
||||||
|
"capacity (" + capacity + ") greater than " +
|
||||||
|
"maximumCapacity (" + maximumCapacity + ")" );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -160,6 +160,10 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
||||||
return maxCapacity;
|
return maxCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMaximumCapacity(String queue, int maxCapacity) {
|
||||||
|
setInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
||||||
|
}
|
||||||
|
|
||||||
public int getUserLimit(String queue) {
|
public int getUserLimit(String queue) {
|
||||||
int userLimit =
|
int userLimit =
|
||||||
getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
|
getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
|
||||||
|
|
|
@ -211,16 +211,19 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
private synchronized void setupQueueConfigs(
|
private synchronized void setupQueueConfigs(
|
||||||
float capacity, float absoluteCapacity,
|
float capacity, float absoluteCapacity,
|
||||||
float maxCapacity, float absoluteMaxCapacity,
|
float maximumCapacity, float absoluteMaxCapacity,
|
||||||
int userLimit, float userLimitFactor,
|
int userLimit, float userLimitFactor,
|
||||||
int maxApplications, int maxApplicationsPerUser,
|
int maxApplications, int maxApplicationsPerUser,
|
||||||
int maxActiveApplications, int maxActiveApplicationsPerUser,
|
int maxActiveApplications, int maxActiveApplicationsPerUser,
|
||||||
QueueState state, Map<QueueACL, AccessControlList> acls)
|
QueueState state, Map<QueueACL, AccessControlList> acls)
|
||||||
{
|
{
|
||||||
|
// Sanity check
|
||||||
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
||||||
|
|
||||||
this.maximumCapacity = maxCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
this.absoluteMaxCapacity = absoluteMaxCapacity;
|
this.absoluteMaxCapacity = absoluteMaxCapacity;
|
||||||
|
|
||||||
this.userLimit = userLimit;
|
this.userLimit = userLimit;
|
||||||
|
@ -236,9 +239,9 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
this.acls = acls;
|
this.acls = acls;
|
||||||
|
|
||||||
this.queueInfo.setCapacity(capacity);
|
this.queueInfo.setCapacity(this.capacity);
|
||||||
this.queueInfo.setMaximumCapacity(maximumCapacity);
|
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
|
||||||
this.queueInfo.setQueueState(state);
|
this.queueInfo.setQueueState(this.state);
|
||||||
|
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||||
|
@ -250,7 +253,7 @@ public class LeafQueue implements CSQueue {
|
||||||
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
||||||
"asboluteCapacity = " + absoluteCapacity +
|
"asboluteCapacity = " + absoluteCapacity +
|
||||||
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
||||||
"maxCapacity = " + maxCapacity +
|
"maxCapacity = " + maximumCapacity +
|
||||||
" [= configuredMaxCapacity ]" + "\n" +
|
" [= configuredMaxCapacity ]" + "\n" +
|
||||||
"absoluteMaxCapacity = " + absoluteMaxCapacity +
|
"absoluteMaxCapacity = " + absoluteMaxCapacity +
|
||||||
" [= Float.MAX_VALUE if maximumCapacity undefined, " +
|
" [= Float.MAX_VALUE if maximumCapacity undefined, " +
|
||||||
|
@ -394,6 +397,9 @@ public class LeafQueue implements CSQueue {
|
||||||
* @param maximumCapacity new max capacity
|
* @param maximumCapacity new max capacity
|
||||||
*/
|
*/
|
||||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||||
|
// Sanity check
|
||||||
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
|
||||||
this.maximumCapacity = maximumCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
this.absoluteMaxCapacity =
|
this.absoluteMaxCapacity =
|
||||||
(maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
|
(maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
|
||||||
|
|
|
@ -153,6 +153,9 @@ public class ParentQueue implements CSQueue {
|
||||||
float maximumCapacity, float absoluteMaxCapacity,
|
float maximumCapacity, float absoluteMaxCapacity,
|
||||||
QueueState state, Map<QueueACL, AccessControlList> acls
|
QueueState state, Map<QueueACL, AccessControlList> acls
|
||||||
) {
|
) {
|
||||||
|
// Sanity check
|
||||||
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.absoluteCapacity = absoluteCapacity;
|
this.absoluteCapacity = absoluteCapacity;
|
||||||
this.maximumCapacity = maximumCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
|
@ -162,9 +165,9 @@ public class ParentQueue implements CSQueue {
|
||||||
|
|
||||||
this.acls = acls;
|
this.acls = acls;
|
||||||
|
|
||||||
this.queueInfo.setCapacity(capacity);
|
this.queueInfo.setCapacity(this.capacity);
|
||||||
this.queueInfo.setMaximumCapacity(maximumCapacity);
|
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
|
||||||
this.queueInfo.setQueueState(state);
|
this.queueInfo.setQueueState(this.state);
|
||||||
|
|
||||||
StringBuilder aclsString = new StringBuilder();
|
StringBuilder aclsString = new StringBuilder();
|
||||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||||
|
@ -484,6 +487,9 @@ public class ParentQueue implements CSQueue {
|
||||||
* @param maximumCapacity new max capacity
|
* @param maximumCapacity new max capacity
|
||||||
*/
|
*/
|
||||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||||
|
// Sanity check
|
||||||
|
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||||
|
|
||||||
this.maximumCapacity = maximumCapacity;
|
this.maximumCapacity = maximumCapacity;
|
||||||
float parentAbsoluteCapacity =
|
float parentAbsoluteCapacity =
|
||||||
(rootQueue) ? 100.0f : parent.getAbsoluteCapacity();
|
(rootQueue) ? 100.0f : parent.getAbsoluteCapacity();
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
@ -35,7 +37,6 @@ public class TestQueueParsing {
|
||||||
|
|
||||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
capacityScheduler.reinitialize(conf, null, null);
|
capacityScheduler.reinitialize(conf, null, null);
|
||||||
//capacityScheduler.g
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||||
|
@ -104,4 +105,48 @@ public class TestQueueParsing {
|
||||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||||
capacityScheduler.reinitialize(conf, null, null);
|
capacityScheduler.reinitialize(conf, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMaxCapacity() throws Exception {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
|
||||||
|
conf.setQueues(CapacityScheduler.ROOT, new String[] {"a", "b", "c"});
|
||||||
|
conf.setCapacity(CapacityScheduler.ROOT, 100);
|
||||||
|
|
||||||
|
final String A = CapacityScheduler.ROOT + ".a";
|
||||||
|
conf.setCapacity(A, 50);
|
||||||
|
conf.setMaximumCapacity(A, 60);
|
||||||
|
|
||||||
|
final String B = CapacityScheduler.ROOT + ".b";
|
||||||
|
conf.setCapacity(B, 50);
|
||||||
|
conf.setMaximumCapacity(B, 45); // Should throw an exception
|
||||||
|
|
||||||
|
|
||||||
|
boolean fail = false;
|
||||||
|
CapacityScheduler capacityScheduler;
|
||||||
|
try {
|
||||||
|
capacityScheduler = new CapacityScheduler();
|
||||||
|
capacityScheduler.reinitialize(conf, null, null);
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
fail = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong maxCap",
|
||||||
|
fail);
|
||||||
|
|
||||||
|
conf.setMaximumCapacity(B, 60);
|
||||||
|
|
||||||
|
// Now this should work
|
||||||
|
capacityScheduler = new CapacityScheduler();
|
||||||
|
capacityScheduler.reinitialize(conf, null, null);
|
||||||
|
|
||||||
|
fail = false;
|
||||||
|
try {
|
||||||
|
LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A);
|
||||||
|
a.setMaxCapacity(45);
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
fail = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
|
||||||
|
"setMaxCap", fail);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue