diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index af73c10f796..65df0c2ea82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.locks.Lock; /** * Thread that handles FairScheduler preemption. @@ -43,6 +44,7 @@ class FSPreemptionThread extends Thread { private final long warnTimeBeforeKill; private final long delayBeforeNextStarvationCheck; private final Timer preemptionTimer; + private final Lock schedulerReadLock; FSPreemptionThread(FairScheduler scheduler) { setDaemon(true); @@ -61,6 +63,7 @@ class FSPreemptionThread extends Thread { : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay + fsConf.getWaitTimeBeforeNextStarvationCheck(); + schedulerReadLock = scheduler.getSchedulerReadLock(); } public void run() { @@ -68,7 +71,14 @@ class FSPreemptionThread extends Thread { FSAppAttempt starvedApp; try{ starvedApp = context.getStarvedApps().take(); - preemptContainers(identifyContainersToPreempt(starvedApp)); + // Hold the scheduler readlock so this is not concurrent with the + // update thread. + schedulerReadLock.lock(); + try { + preemptContainers(identifyContainersToPreempt(starvedApp)); + } finally { + schedulerReadLock.unlock(); + } starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck); } catch (InterruptedException e) { LOG.info("Preemption thread interrupted! Exiting."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index c946bfb38bb..324677872bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -104,6 +104,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; /** * A scheduler that schedules resources between a set of queues. The scheduler @@ -1782,4 +1783,8 @@ public class FairScheduler extends long getNMHeartbeatInterval() { return nmHeartbeatInterval; } + + ReadLock getSchedulerReadLock() { + return this.readLock; + } }