YARN-6215. FairScheduler preemption and update should not run concurrently. (Tao Jie via kasha)
(cherry picked from commit 815d53506f
)
This commit is contained in:
parent
f3cdf29af4
commit
a95d3e1fe7
|
@ -32,6 +32,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thread that handles FairScheduler preemption.
|
* Thread that handles FairScheduler preemption.
|
||||||
|
@ -43,6 +44,7 @@ class FSPreemptionThread extends Thread {
|
||||||
private final long warnTimeBeforeKill;
|
private final long warnTimeBeforeKill;
|
||||||
private final long delayBeforeNextStarvationCheck;
|
private final long delayBeforeNextStarvationCheck;
|
||||||
private final Timer preemptionTimer;
|
private final Timer preemptionTimer;
|
||||||
|
private final Lock schedulerReadLock;
|
||||||
|
|
||||||
FSPreemptionThread(FairScheduler scheduler) {
|
FSPreemptionThread(FairScheduler scheduler) {
|
||||||
setDaemon(true);
|
setDaemon(true);
|
||||||
|
@ -61,6 +63,7 @@ class FSPreemptionThread extends Thread {
|
||||||
: 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
|
: 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
|
||||||
delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
|
delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
|
||||||
fsConf.getWaitTimeBeforeNextStarvationCheck();
|
fsConf.getWaitTimeBeforeNextStarvationCheck();
|
||||||
|
schedulerReadLock = scheduler.getSchedulerReadLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
|
@ -68,7 +71,14 @@ class FSPreemptionThread extends Thread {
|
||||||
FSAppAttempt starvedApp;
|
FSAppAttempt starvedApp;
|
||||||
try{
|
try{
|
||||||
starvedApp = context.getStarvedApps().take();
|
starvedApp = context.getStarvedApps().take();
|
||||||
|
// Hold the scheduler readlock so this is not concurrent with the
|
||||||
|
// update thread.
|
||||||
|
schedulerReadLock.lock();
|
||||||
|
try {
|
||||||
preemptContainers(identifyContainersToPreempt(starvedApp));
|
preemptContainers(identifyContainersToPreempt(starvedApp));
|
||||||
|
} finally {
|
||||||
|
schedulerReadLock.unlock();
|
||||||
|
}
|
||||||
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
|
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info("Preemption thread interrupted! Exiting.");
|
LOG.info("Preemption thread interrupted! Exiting.");
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -1786,4 +1787,8 @@ public class FairScheduler extends
|
||||||
long getNMHeartbeatInterval() {
|
long getNMHeartbeatInterval() {
|
||||||
return nmHeartbeatInterval;
|
return nmHeartbeatInterval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReadLock getSchedulerReadLock() {
|
||||||
|
return this.readLock;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue