diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f5361c895cf..5dea1c83cfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -59,6 +59,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // Indicates all the remaining dispatcher's events on stop have been drained // and processed. + // Race condition happens if dispatcher thread sets drained to true between + // handler setting drained to false and enqueueing event. YARN-3878 decided + // to ignore it because of its tiny impact. Also see YARN-5436. private volatile boolean drained = true; private final Object waitForDrained = new Object(); @@ -300,9 +303,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { protected boolean isEventThreadWaiting() { return eventHandlingThread.getState() == Thread.State.WAITING; } - - @VisibleForTesting - protected boolean isDrained() { - return this.drained; - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e4a5a82a165..cf4b1b54d17 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -22,6 +22,10 @@ import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { + private volatile boolean drained = false; + private volatile boolean stopped = false; + private final BlockingQueue queue; + private final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -29,6 +33,8 @@ public class DrainDispatcher extends AsyncDispatcher { public DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); + this.queue = eventQueue; + this.mutex = this; } /** @@ -44,8 +50,53 @@ public class DrainDispatcher extends AsyncDispatcher { * Busy loop waiting for all queued events to drain. */ public void await() { - while (!isDrained()) { + while (!drained) { Thread.yield(); } } + + @Override + Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } + Event event; + try { + event = queue.take(); + } catch (InterruptedException ie) { + return; + } + if (event != null) { + dispatch(event); + } + } + } + }; + } + + @SuppressWarnings("unchecked") + @Override + public EventHandler getEventHandler() { + final EventHandler actual = super.getEventHandler(); + return new EventHandler() { + @Override + public void handle(Event event) { + synchronized (mutex) { + actual.handle(event); + drained = false; + } + } + }; + } + + @Override + protected void serviceStop() throws Exception { + stopped = true; + super.serviceStop(); + } }