YARN-177. CapacityScheduler - adding a queue while the RM is running has wacky results (acmurthy vai tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401668 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ca4bc1a417
commit
cc523683cf
|
@ -174,6 +174,9 @@ Release 0.23.5 - UNRELEASED
|
|||
YARN-174. Modify NodeManager to pass the user's configuration even when
|
||||
rebooting. (vinodkv)
|
||||
|
||||
YARN-177. CapacityScheduler - adding a queue while the RM is running has
|
||||
wacky results (acmurthy vai tgraves)
|
||||
|
||||
Release 0.23.4 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -50,6 +50,12 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
*/
|
||||
public CSQueue getParent();
|
||||
|
||||
/**
|
||||
* Set the parent <code>Queue</code>.
|
||||
* @param newParentQueue new parent queue
|
||||
*/
|
||||
public void setParent(CSQueue newParentQueue);
|
||||
|
||||
/**
|
||||
* Get the queue name.
|
||||
* @return the queue name
|
||||
|
@ -195,10 +201,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
|
|||
|
||||
/**
|
||||
* Reinitialize the queue.
|
||||
* @param queue new queue to re-initalize from
|
||||
* @param newlyParsedQueue new queue to re-initalize from
|
||||
* @param clusterResource resources in the cluster
|
||||
*/
|
||||
public void reinitialize(CSQueue queue, Resource clusterResource)
|
||||
public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -223,7 +223,7 @@ public class LeafQueue implements CSQueue {
|
|||
{
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
float absCapacity = parent.getAbsoluteCapacity() * capacity;
|
||||
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
|
||||
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
|
||||
|
||||
this.capacity = capacity;
|
||||
|
@ -256,7 +256,7 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
this, parent, clusterResource, minimumAllocation);
|
||||
this, getParent(), clusterResource, minimumAllocation);
|
||||
|
||||
LOG.info("Initializing " + queueName + "\n" +
|
||||
"capacity = " + capacity +
|
||||
|
@ -339,10 +339,15 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CSQueue getParent() {
|
||||
public synchronized CSQueue getParent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized void setParent(CSQueue newParentQueue) {
|
||||
this.parent = (ParentQueue)newParentQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
|
@ -350,7 +355,7 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
@Override
|
||||
public String getQueuePath() {
|
||||
return parent.getQueuePath() + "." + getQueueName();
|
||||
return getParent().getQueuePath() + "." + getQueueName();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -430,7 +435,9 @@ public class LeafQueue implements CSQueue {
|
|||
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
|
||||
float absMaxCapacity =
|
||||
CSQueueUtils.computeAbsoluteMaximumCapacity(
|
||||
maximumCapacity, getParent());
|
||||
CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
|
||||
|
||||
this.maximumCapacity = maximumCapacity;
|
||||
|
@ -453,10 +460,6 @@ public class LeafQueue implements CSQueue {
|
|||
this.userLimitFactor = userLimitFactor;
|
||||
}
|
||||
|
||||
synchronized void setParentQueue(CSQueue parent) {
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getNumApplications() {
|
||||
return getNumPendingApplications() + getNumActiveApplications();
|
||||
|
@ -559,26 +562,28 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||
public synchronized void reinitialize(
|
||||
CSQueue newlyParsedQueue, Resource clusterResource)
|
||||
throws IOException {
|
||||
// Sanity check
|
||||
if (!(queue instanceof LeafQueue) ||
|
||||
!queue.getQueuePath().equals(getQueuePath())) {
|
||||
if (!(newlyParsedQueue instanceof LeafQueue) ||
|
||||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath() +
|
||||
" from " + queue.getQueuePath());
|
||||
" from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
|
||||
LeafQueue leafQueue = (LeafQueue)queue;
|
||||
LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
|
||||
setupQueueConfigs(
|
||||
clusterResource,
|
||||
leafQueue.capacity, leafQueue.absoluteCapacity,
|
||||
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
|
||||
leafQueue.userLimit, leafQueue.userLimitFactor,
|
||||
leafQueue.maxApplications,
|
||||
leafQueue.getMaxApplicationsPerUser(),
|
||||
leafQueue.getMaximumActiveApplications(),
|
||||
leafQueue.getMaximumActiveApplicationsPerUser(),
|
||||
leafQueue.state, leafQueue.acls);
|
||||
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
|
||||
newlyParsedLeafQueue.maximumCapacity,
|
||||
newlyParsedLeafQueue.absoluteMaxCapacity,
|
||||
newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
|
||||
newlyParsedLeafQueue.maxApplications,
|
||||
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
|
||||
newlyParsedLeafQueue.getMaximumActiveApplications(),
|
||||
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
|
||||
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -591,7 +596,7 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
// Check if parent-queue allows access
|
||||
return parent.hasAccess(acl, user);
|
||||
return getParent().hasAccess(acl, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -649,10 +654,10 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
// Inform the parent queue
|
||||
try {
|
||||
parent.submitApplication(application, userName, queue);
|
||||
getParent().submitApplication(application, userName, queue);
|
||||
} catch (AccessControlException ace) {
|
||||
LOG.info("Failed to submit application to parent-queue: " +
|
||||
parent.getQueuePath(), ace);
|
||||
getParent().getQueuePath(), ace);
|
||||
removeApplication(application, user);
|
||||
throw ace;
|
||||
}
|
||||
|
@ -708,7 +713,7 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
// Inform the parent queue
|
||||
parent.finishApplication(application, queue);
|
||||
getParent().finishApplication(application, queue);
|
||||
}
|
||||
|
||||
public synchronized void removeApplication(FiCaSchedulerApp application, User user) {
|
||||
|
@ -1351,7 +1356,7 @@ public class LeafQueue implements CSQueue {
|
|||
}
|
||||
|
||||
// Inform the parent queue
|
||||
parent.completedContainer(clusterResource, application,
|
||||
getParent().completedContainer(clusterResource, application,
|
||||
node, rmContainer, null, event);
|
||||
}
|
||||
}
|
||||
|
@ -1361,7 +1366,7 @@ public class LeafQueue implements CSQueue {
|
|||
// Update queue metrics
|
||||
Resources.addTo(usedResources, resource);
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
this, parent, clusterResource, minimumAllocation);
|
||||
this, getParent(), clusterResource, minimumAllocation);
|
||||
++numContainers;
|
||||
|
||||
// Update user metrics
|
||||
|
@ -1386,7 +1391,7 @@ public class LeafQueue implements CSQueue {
|
|||
// Update queue metrics
|
||||
Resources.subtractFrom(usedResources, resource);
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
this, parent, clusterResource, minimumAllocation);
|
||||
this, getParent(), clusterResource, minimumAllocation);
|
||||
--numContainers;
|
||||
|
||||
// Update user metrics
|
||||
|
@ -1417,7 +1422,7 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(
|
||||
this, parent, clusterResource, minimumAllocation);
|
||||
this, getParent(), clusterResource, minimumAllocation);
|
||||
|
||||
// Update application properties
|
||||
for (FiCaSchedulerApp application : activeApplications) {
|
||||
|
@ -1488,7 +1493,7 @@ public class LeafQueue implements CSQueue {
|
|||
synchronized (this) {
|
||||
allocateResource(clusterResource, application, container.getResource());
|
||||
}
|
||||
parent.recoverContainer(clusterResource, application, container);
|
||||
getParent().recoverContainer(clusterResource, application, container);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ParentQueue implements CSQueue {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
|
||||
|
||||
private final CSQueue parent;
|
||||
private CSQueue parent;
|
||||
private final String queueName;
|
||||
|
||||
private float capacity;
|
||||
|
@ -216,10 +216,15 @@ public class ParentQueue implements CSQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CSQueue getParent() {
|
||||
public synchronized CSQueue getParent() {
|
||||
return parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void setParent(CSQueue newParentQueue) {
|
||||
this.parent = (ParentQueue)newParentQueue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
|
@ -357,37 +362,52 @@ public class ParentQueue implements CSQueue {
|
|||
}
|
||||
|
||||
@Override
|
||||
public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||
public synchronized void reinitialize(
|
||||
CSQueue newlyParsedQueue, Resource clusterResource)
|
||||
throws IOException {
|
||||
// Sanity check
|
||||
if (!(queue instanceof ParentQueue) ||
|
||||
!queue.getQueuePath().equals(getQueuePath())) {
|
||||
if (!(newlyParsedQueue instanceof ParentQueue) ||
|
||||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
|
||||
throw new IOException("Trying to reinitialize " + getQueuePath() +
|
||||
" from " + queue.getQueuePath());
|
||||
" from " + newlyParsedQueue.getQueuePath());
|
||||
}
|
||||
|
||||
ParentQueue parentQueue = (ParentQueue)queue;
|
||||
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource,
|
||||
parentQueue.capacity, parentQueue.absoluteCapacity,
|
||||
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
|
||||
parentQueue.state, parentQueue.acls);
|
||||
newlyParsedParentQueue.capacity,
|
||||
newlyParsedParentQueue.absoluteCapacity,
|
||||
newlyParsedParentQueue.maximumCapacity,
|
||||
newlyParsedParentQueue.absoluteMaxCapacity,
|
||||
newlyParsedParentQueue.state,
|
||||
newlyParsedParentQueue.acls);
|
||||
|
||||
// Re-configure existing child queues and add new ones
|
||||
// The CS has already checked to ensure all existing child queues are present!
|
||||
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
|
||||
Map<String, CSQueue> newChildQueues = getQueues(parentQueue.childQueues);
|
||||
Map<String, CSQueue> newChildQueues =
|
||||
getQueues(newlyParsedParentQueue.childQueues);
|
||||
for (Map.Entry<String, CSQueue> e : newChildQueues.entrySet()) {
|
||||
String newChildQueueName = e.getKey();
|
||||
CSQueue newChildQueue = e.getValue();
|
||||
|
||||
CSQueue childQueue = currentChildQueues.get(newChildQueueName);
|
||||
if (childQueue != null){
|
||||
|
||||
// Check if the child-queue already exists
|
||||
if (childQueue != null) {
|
||||
// Re-init existing child queues
|
||||
childQueue.reinitialize(newChildQueue, clusterResource);
|
||||
LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
|
||||
} else {
|
||||
// New child queue, do not re-init
|
||||
|
||||
// Set parent to 'this'
|
||||
newChildQueue.setParent(this);
|
||||
|
||||
// Save in list of current child queues
|
||||
currentChildQueues.put(newChildQueueName, newChildQueue);
|
||||
|
||||
LOG.info(getQueueName() + ": added new child queue: " + newChildQueue);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -378,4 +378,43 @@ public class TestCapacityScheduler {
|
|||
|
||||
Assert.assertEquals(4 * GB, cs.getClusterResources().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueuesWithNewQueue() throws Exception {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new ClientToAMTokenSecretManagerInRM()));
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
// Add a new queue b4
|
||||
String B4 = B + ".b4";
|
||||
float B4_CAPACITY = 10;
|
||||
|
||||
B3_CAPACITY -= B4_CAPACITY;
|
||||
try {
|
||||
conf.setCapacity(A, 80f);
|
||||
conf.setCapacity(B, 20f);
|
||||
conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"});
|
||||
conf.setCapacity(B1, B1_CAPACITY);
|
||||
conf.setCapacity(B2, B2_CAPACITY);
|
||||
conf.setCapacity(B3, B3_CAPACITY);
|
||||
conf.setCapacity(B4, B4_CAPACITY);
|
||||
cs.reinitialize(conf,null);
|
||||
checkQueueCapacities(cs, 80f, 20f);
|
||||
|
||||
// Verify parent for B4
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueB = findQueue(rootQueue, B);
|
||||
CSQueue queueB4 = findQueue(queueB, B4);
|
||||
|
||||
assertEquals(queueB, queueB4.getParent());
|
||||
} finally {
|
||||
B3_CAPACITY += B4_CAPACITY;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue