diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 41fd5bfd5a3..6f8567e205d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers when nodes resync during work-preserving RM restart. (Jian He via vinodkv) + YARN-2264. Fixed a race condition in DrainDispatcher which may cause random + test failures. (Li Lu via jianhe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e79e7b360ef..803b2bb2b3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -28,6 +28,7 @@ public class DrainDispatcher extends AsyncDispatcher { // and similar grotesqueries private volatile boolean drained = false; private final BlockingQueue queue; + final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -36,6 +37,7 @@ public DrainDispatcher() { private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); this.queue = eventQueue; + this.mutex = this; } /** @@ -53,8 +55,10 @@ Runnable createThread() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } Event event; try { event = queue.take(); @@ -75,8 +79,10 @@ public EventHandler getEventHandler() { return new EventHandler() { @Override public void handle(Event event) { - drained = false; - actual.handle(event); + synchronized (mutex) { + actual.handle(event); + drained = false; + } } }; }