merge -r 1401667:1401668 from trunk. FIXES: YARN-177

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1401669 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas Graves 2012-10-24 13:25:19 +00:00
parent e49fb14997
commit 1ef0ff3af5
5 changed files with 119 additions and 46 deletions

View File

@ -157,6 +157,9 @@ Release 0.23.5 - UNRELEASED
YARN-174. Modify NodeManager to pass the user's configuration even when
rebooting. (vinodkv)
YARN-177. CapacityScheduler - adding a queue while the RM is running has
wacky results (acmurthy vai tgraves)
Release 0.23.4 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -50,6 +50,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public CSQueue getParent();
/**
* Set the parent <code>Queue</code>.
* @param newParentQueue new parent queue
*/
public void setParent(CSQueue newParentQueue);
/**
* Get the queue name.
* @return the queue name
@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
/**
* Reinitialize the queue.
* @param queue new queue to re-initalize from
* @param newlyParsedQueue new queue to re-initalize from
* @param clusterResource resources in the cluster
*/
public void reinitialize(CSQueue queue, Resource clusterResource)
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException;
/**

View File

@ -223,7 +223,7 @@ public class LeafQueue implements CSQueue {
{
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absCapacity = parent.getAbsoluteCapacity() * capacity;
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
this.capacity = capacity;
@ -256,7 +256,7 @@ public class LeafQueue implements CSQueue {
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@ -339,10 +339,15 @@ public class LeafQueue implements CSQueue {
}
@Override
public CSQueue getParent() {
public synchronized CSQueue getParent() {
return parent;
}
@Override
public synchronized void setParent(CSQueue newParentQueue) {
this.parent = (ParentQueue)newParentQueue;
}
@Override
public String getQueueName() {
return queueName;
@ -350,7 +355,7 @@ public class LeafQueue implements CSQueue {
@Override
public String getQueuePath() {
return parent.getQueuePath() + "." + getQueueName();
return getParent().getQueuePath() + "." + getQueueName();
}
/**
@ -430,7 +435,9 @@ public class LeafQueue implements CSQueue {
synchronized void setMaxCapacity(float maximumCapacity) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(
maximumCapacity, getParent());
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
this.maximumCapacity = maximumCapacity;
@ -453,10 +460,6 @@ public class LeafQueue implements CSQueue {
this.userLimitFactor = userLimitFactor;
}
synchronized void setParentQueue(CSQueue parent) {
this.parent = parent;
}
@Override
public synchronized int getNumApplications() {
return getNumPendingApplications() + getNumActiveApplications();
@ -559,26 +562,28 @@ public class LeafQueue implements CSQueue {
}
@Override
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
// Sanity check
if (!(queue instanceof LeafQueue) ||
!queue.getQueuePath().equals(getQueuePath())) {
if (!(newlyParsedQueue instanceof LeafQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + queue.getQueuePath());
" from " + newlyParsedQueue.getQueuePath());
}
LeafQueue leafQueue = (LeafQueue)queue;
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
setupQueueConfigs(
clusterResource,
leafQueue.capacity, leafQueue.absoluteCapacity,
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
leafQueue.userLimit, leafQueue.userLimitFactor,
leafQueue.maxApplications,
leafQueue.getMaxApplicationsPerUser(),
leafQueue.getMaximumActiveApplications(),
leafQueue.getMaximumActiveApplicationsPerUser(),
leafQueue.state, leafQueue.acls);
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
newlyParsedLeafQueue.maximumCapacity,
newlyParsedLeafQueue.absoluteMaxCapacity,
newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
newlyParsedLeafQueue.maxApplications,
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
}
@Override
@ -591,7 +596,7 @@ public class LeafQueue implements CSQueue {
}
// Check if parent-queue allows access
return parent.hasAccess(acl, user);
return getParent().hasAccess(acl, user);
}
@Override
@ -649,10 +654,10 @@ public class LeafQueue implements CSQueue {
// Inform the parent queue
try {
parent.submitApplication(application, userName, queue);
getParent().submitApplication(application, userName, queue);
} catch (AccessControlException ace) {
LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
getParent().getQueuePath(), ace);
removeApplication(application, user);
throw ace;
}
@ -708,7 +713,7 @@ public class LeafQueue implements CSQueue {
}
// Inform the parent queue
parent.finishApplication(application, queue);
getParent().finishApplication(application, queue);
}
public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
@ -1351,7 +1356,7 @@ public class LeafQueue implements CSQueue {
}
// Inform the parent queue
parent.completedContainer(clusterResource, application,
getParent().completedContainer(clusterResource, application,
node, rmContainer, null, event);
}
}
@ -1361,7 +1366,7 @@ public class LeafQueue implements CSQueue {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
++numContainers;
// Update user metrics
@ -1386,7 +1391,7 @@ public class LeafQueue implements CSQueue {
// Update queue metrics
Resources.subtractFrom(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
--numContainers;
// Update user metrics
@ -1417,7 +1422,7 @@ public class LeafQueue implements CSQueue {
// Update metrics
CSQueueUtils.updateQueueStatistics(
this, parent, clusterResource, minimumAllocation);
this, getParent(), clusterResource, minimumAllocation);
// Update application properties
for (FiCaSchedulerApp application : activeApplications) {
@ -1488,7 +1493,7 @@ public class LeafQueue implements CSQueue {
synchronized (this) {
allocateResource(clusterResource, application, container.getResource());
}
parent.recoverContainer(clusterResource, application, container);
getParent().recoverContainer(clusterResource, application, container);
}

View File

@ -60,7 +60,7 @@ public class ParentQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
private final CSQueue parent;
private CSQueue parent;
private final String queueName;
private float capacity;
@ -216,10 +216,15 @@ public class ParentQueue implements CSQueue {
}
@Override
public CSQueue getParent() {
public synchronized CSQueue getParent() {
return parent;
}
@Override
public synchronized void setParent(CSQueue newParentQueue) {
this.parent = (ParentQueue)newParentQueue;
}
@Override
public String getQueueName() {
return queueName;
@ -357,37 +362,52 @@ public class ParentQueue implements CSQueue {
}
@Override
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
// Sanity check
if (!(queue instanceof ParentQueue) ||
!queue.getQueuePath().equals(getQueuePath())) {
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
throw new IOException("Trying to reinitialize " + getQueuePath() +
" from " + queue.getQueuePath());
" from " + newlyParsedQueue.getQueuePath());
}
ParentQueue parentQueue = (ParentQueue)queue;
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource,
parentQueue.capacity, parentQueue.absoluteCapacity,
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
parentQueue.state, parentQueue.acls);
newlyParsedParentQueue.capacity,
newlyParsedParentQueue.absoluteCapacity,
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
Map<String, CSQueue> newChildQueues =
getQueues(newlyParsedParentQueue.childQueues);
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
String newChildQueueName = e.getKey();
CSQueue newChildQueue = e.getValue();
CSQueue childQueue = currentChildQueues.get(newChildQueueName);
if (childQueue != null){
// Check if the child-queue already exists
if (childQueue != null) {
// Re-init existing child queues
childQueue.reinitialize(newChildQueue, clusterResource);
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
} else {
// New child queue, do not re-init
// Set parent to 'this'
newChildQueue.setParent(this);
// Save in list of current child queues
currentChildQueues.put(newChildQueueName, newChildQueue);
LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
}
}

View File

@ -378,4 +378,43 @@ public class TestCapacityScheduler {
Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
}
@Test
public void testRefreshQueuesWithNewQueue() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new ClientToAMTokenSecretManagerInRM()));
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// Add a new queue b4
String B4 = B + ".b4";
float B4_CAPACITY = 10;
B3_CAPACITY -= B4_CAPACITY;
try {
conf.setCapacity(A, 80f);
conf.setCapacity(B, 20f);
conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"});
conf.setCapacity(B1, B1_CAPACITY);
conf.setCapacity(B2, B2_CAPACITY);
conf.setCapacity(B3, B3_CAPACITY);
conf.setCapacity(B4, B4_CAPACITY);
cs.reinitialize(conf,null);
checkQueueCapacities(cs, 80f, 20f);
// Verify parent for B4
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueB4 = findQueue(queueB, B4);
assertEquals(queueB, queueB4.getParent());
} finally {
B3_CAPACITY += B4_CAPACITY;
}
}
}