From 5f698ffb57d5866255bf8a94a59bb2b11b341650 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 31 Dec 2013 01:10:29 +0000 Subject: [PATCH] YARN-1121. Addendum patch. Fixed AsyncDispatcher hang issue during stop due to a race condition caused by the previous patch. Contributed by Jian He. svn merge --ignore-ancestry -c 1554344 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1554345 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/yarn/event/AsyncDispatcher.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index bf5058a9d13..733f0eaabc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -56,6 +56,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // Indicates all the remaining dispatcher's events on stop have been drained // and processed. private volatile boolean drained = true; + private Object waitForDrained = new Object(); // For drainEventsOnStop enabled only, block newly coming events into the // queue while stopping. @@ -82,6 +83,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); + // blockNewEvents is only set when dispatcher is draining to stop, + // adding this check is to avoid the overhead of acquiring the lock + // and calling notify every time in the normal run of the loop. + if (blockNewEvents) { + synchronized (waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } Event event; try { event = eventQueue.take(); @@ -125,8 +136,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { if (drainEventsOnStop) { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); - while(!drained) { - Thread.yield(); + synchronized (waitForDrained) { + while (!drained && eventHandlingThread.isAlive()) { + waitForDrained.wait(1000); + LOG.info("Waiting for AsyncDispatcher to drain."); + } } } stopped = true;