YARN-6172. FSLeafQueue demand update needs to be atomic. (Miklos Szegedi via kasha)

This commit is contained in:
Karthik Kambatla 2017-02-26 20:36:33 -08:00
parent 815d53506f
commit fbfe86deea
2 changed files with 26 additions and 19 deletions

View File

@ -331,20 +331,22 @@ public class FSLeafQueue extends FSQueue {
public void updateDemand() { public void updateDemand() {
// Compute demand by iterating through apps in the queue // Compute demand by iterating through apps in the queue
// Limit demand to maxResources // Limit demand to maxResources
demand = Resources.createResource(0); Resource tmpDemand = Resources.createResource(0);
readLock.lock(); readLock.lock();
try { try {
for (FSAppAttempt sched : runnableApps) { for (FSAppAttempt sched : runnableApps) {
updateDemandForApp(sched); sched.updateDemand();
Resources.addTo(tmpDemand, sched.getDemand());
} }
for (FSAppAttempt sched : nonRunnableApps) { for (FSAppAttempt sched : nonRunnableApps) {
updateDemandForApp(sched); sched.updateDemand();
Resources.addTo(tmpDemand, sched.getDemand());
} }
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
// Cap demand to maxShare to limit allocation to maxShare // Cap demand to maxShare to limit allocation to maxShare
demand = Resources.componentwiseMin(demand, maxShare); demand = Resources.componentwiseMin(tmpDemand, maxShare);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxShare); + "; the max is " + maxShare);
@ -353,17 +355,6 @@ public class FSLeafQueue extends FSQueue {
} }
} }
private void updateDemandForApp(FSAppAttempt sched) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ "; Total resource demand for " + getName() + " now "
+ demand);
}
demand = Resources.add(demand, toAdd);
}
@Override @Override
public Resource assignContainer(FSSchedulerNode node) { public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = none(); Resource assigned = none();

View File

@ -96,6 +96,14 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
public void testPreemptionEnabled() throws Exception { public void testPreemptionEnabled() throws Exception {
setupClusterAndSubmitJobs(); setupClusterAndSubmitJobs();
// Wait for apps to be processed by MockPreemptionThread
for (int i = 0; i < 6000; ++i) {
if (preemptionThread.uniqueAppsAdded() >= 3) {
break;
}
Thread.sleep(10);
}
assertNotNull("FSContext does not have an FSStarvedApps instance", assertNotNull("FSContext does not have an FSStarvedApps instance",
scheduler.getContext().getStarvedApps()); scheduler.getContext().getStarvedApps());
assertEquals("Expecting 3 starved applications, one each for the " assertEquals("Expecting 3 starved applications, one each for the "
@ -113,8 +121,19 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
clock.tickMsec( clock.tickMsec(
FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS); FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS);
scheduler.update(); scheduler.update();
// Wait for apps to be processed by MockPreemptionThread
for (int i = 0; i < 6000; ++i) {
if(preemptionThread.totalAppsAdded() >
preemptionThread.uniqueAppsAdded()) {
break;
}
Thread.sleep(10);
}
assertTrue("Each app is marked as starved exactly once", assertTrue("Each app is marked as starved exactly once",
preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded()); preemptionThread.totalAppsAdded() >
preemptionThread.uniqueAppsAdded());
} }
/* /*
@ -154,9 +173,6 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
// Scheduler update to populate starved apps // Scheduler update to populate starved apps
scheduler.update(); scheduler.update();
// Wait for apps to be processed by MockPreemptionThread
Thread.yield();
} }
/** /**