From efe6260762920b7e41391c39d0623b5ffb74159c Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 2 Jan 2015 11:08:47 -0800 Subject: [PATCH] YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks. (cherry picked from commit 947578c1c1413f9043ceb1e87df6a97df048e854) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../hadoop/yarn/event/AsyncDispatcher.java | 7 +++ .../hadoop/yarn/event/DrainDispatcher.java | 50 +------------------ 3 files changed, 11 insertions(+), 49 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a135f1e78a0..e76fbcc0108 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -273,6 +273,9 @@ Release 2.7.0 - UNRELEASED YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and app ACLs. (Varun Saxena via jianhe) + YARN-2991. Fixed DrainDispatcher to reuse the draining code path in + AsyncDispatcher. (Rohith Sharmaks via zjshen) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 28be6acf732..d36d841772a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import com.google.common.annotations.VisibleForTesting; + /** * Dispatches {@link Event}s in a separate thread. Currently only single thread * does that. Potentially there could be multiple channels for each event type @@ -282,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } }; } + + @VisibleForTesting + protected boolean isDrained() { + return this.drained; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 803b2bb2b3b..da5ae443ae0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -23,68 +23,20 @@ import java.util.concurrent.LinkedBlockingQueue; @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { -// flagrant initialize abuse throughout, but safe per -// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595 -// and similar grotesqueries - private volatile boolean drained = false; - private final BlockingQueue queue; - final Object mutex; - public DrainDispatcher() { this(new LinkedBlockingQueue()); } private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); - this.queue = eventQueue; - this.mutex = this; } /** * Busy loop waiting for all queued events to drain. */ public void await() { - while (!drained) { + while (!isDrained()) { Thread.yield(); } } - - @Override - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!Thread.currentThread().isInterrupted()) { - synchronized (mutex) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); - } - Event event; - try { - event = queue.take(); - } catch(InterruptedException ie) { - return; - } - if (event != null) { - dispatch(event); - } - } - } - }; - } - - @Override - public EventHandler getEventHandler() { - final EventHandler actual = super.getEventHandler(); - return new EventHandler() { - @Override - public void handle(Event event) { - synchronized (mutex) { - actual.handle(event); - drained = false; - } - } - }; - } - }