YARN-525. make CS node-locality-delay refreshable. Contributed by Thomas Graves
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1465009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0836d68abf
commit
18757c1267
|
@ -509,6 +509,8 @@ Release 0.23.7 - UNRELEASED
|
|||
YARN-200. yarn log does not output all needed information, and is in a
|
||||
binary format (Ravi Prakash via jlowe)
|
||||
|
||||
YARN-525. make CS node-locality-delay refreshable (Thomas Graves via jlowe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-357. App submission should not be synchronized (daryn)
|
||||
|
|
|
@ -89,6 +89,8 @@ public class LeafQueue implements CSQueue {
|
|||
private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
|
||||
private int maxActiveApplicationsPerUser;
|
||||
|
||||
private int nodeLocalityDelay;
|
||||
|
||||
private Resource usedResources = Resources.createResource(0, 0);
|
||||
private float usedCapacity = 0.0f;
|
||||
private volatile int numContainers;
|
||||
|
@ -123,8 +125,6 @@ public class LeafQueue implements CSQueue {
|
|||
|
||||
private final ActiveUsersManager activeUsersManager;
|
||||
|
||||
private final int nodeLocalityDelay;
|
||||
|
||||
private final ResourceCalculator resourceCalculator;
|
||||
|
||||
public LeafQueue(CapacitySchedulerContext cs,
|
||||
|
@ -196,9 +196,6 @@ public class LeafQueue implements CSQueue {
|
|||
Map<QueueACL, AccessControlList> acls =
|
||||
cs.getConfiguration().getAcls(getQueuePath());
|
||||
|
||||
this.nodeLocalityDelay =
|
||||
cs.getConfiguration().getNodeLocalityDelay();
|
||||
|
||||
setupQueueConfigs(
|
||||
cs.getClusterResources(),
|
||||
capacity, absoluteCapacity,
|
||||
|
@ -206,7 +203,7 @@ public class LeafQueue implements CSQueue {
|
|||
userLimit, userLimitFactor,
|
||||
maxApplications, maxApplicationsPerUser,
|
||||
maxActiveApplications, maxActiveApplicationsPerUser,
|
||||
state, acls);
|
||||
state, acls, cs.getConfiguration().getNodeLocalityDelay());
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("LeafQueue:" + " name=" + queueName
|
||||
|
@ -227,7 +224,8 @@ public class LeafQueue implements CSQueue {
|
|||
int userLimit, float userLimitFactor,
|
||||
int maxApplications, int maxApplicationsPerUser,
|
||||
int maxActiveApplications, int maxActiveApplicationsPerUser,
|
||||
QueueState state, Map<QueueACL, AccessControlList> acls)
|
||||
QueueState state, Map<QueueACL, AccessControlList> acls,
|
||||
int nodeLocalityDelay)
|
||||
{
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
@ -256,6 +254,8 @@ public class LeafQueue implements CSQueue {
|
|||
this.queueInfo.setCapacity(this.capacity);
|
||||
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
|
||||
this.queueInfo.setQueueState(this.state);
|
||||
|
||||
this.nodeLocalityDelay = nodeLocalityDelay;
|
||||
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||
|
@ -319,7 +319,8 @@ public class LeafQueue implements CSQueue {
|
|||
"state = " + state +
|
||||
" [= configuredState ]" + "\n" +
|
||||
"acls = " + aclsString +
|
||||
" [= configuredAcls ]" + "\n");
|
||||
" [= configuredAcls ]" + "\n" +
|
||||
"nodeLocalityDelay = " + nodeLocalityDelay + "\n");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -605,7 +606,8 @@ public class LeafQueue implements CSQueue {
|
|||
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
|
||||
newlyParsedLeafQueue.getMaximumActiveApplications(),
|
||||
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
|
||||
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
|
||||
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
|
||||
newlyParsedLeafQueue.getNodeLocalityDelay());
|
||||
|
||||
// queue metrics are updated, more resource may be available
|
||||
// activate the pending applications if possible
|
||||
|
|
|
@ -1623,6 +1623,30 @@ public class TestLeafQueue {
|
|||
assertEquals(3, e.activeApplications.size());
|
||||
assertEquals(0, e.pendingApplications.size());
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testNodeLocalityAfterQueueRefresh() throws Exception {
|
||||
|
||||
// Manipulate queue 'e'
|
||||
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
|
||||
|
||||
// before reinitialization
|
||||
assertEquals(0, e.getNodeLocalityDelay());
|
||||
|
||||
csConf.setInt(CapacitySchedulerConfiguration
|
||||
.NODE_LOCALITY_DELAY, 60);
|
||||
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
||||
CSQueue newRoot =
|
||||
CapacityScheduler.parseQueue(csContext, csConf, null,
|
||||
CapacitySchedulerConfiguration.ROOT,
|
||||
newQueues, queues,
|
||||
TestUtils.spyHook);
|
||||
queues = newQueues;
|
||||
root.reinitialize(newRoot, cs.getClusterResources());
|
||||
|
||||
// after reinitialization
|
||||
assertEquals(60, e.getNodeLocalityDelay());
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testActivateApplicationByUpdatingClusterResource()
|
||||
|
|
Loading…
Reference in New Issue