YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.

(cherry picked from commit 947578c1c1)
This commit is contained in:
Zhijie Shen 2015-01-02 11:08:47 -08:00
parent 05df432fab
commit efe6260762
3 changed files with 11 additions and 49 deletions

View File

@ -273,6 +273,9 @@ Release 2.7.0 - UNRELEASED
YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and
app ACLs. (Varun Saxena via jianhe)
YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
AsyncDispatcher. (Rohith Sharmaks via zjshen)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
/**
* Dispatches {@link Event}s in a separate thread. Currently only single thread
* does that. Potentially there could be multiple channels for each event type
@ -282,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
}
};
}
@VisibleForTesting
protected boolean isDrained() {
return this.drained;
}
}

View File

@ -23,68 +23,20 @@ import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
// flagrant initialize abuse throughout, but safe per
// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595
// and similar grotesqueries
private volatile boolean drained = false;
private final BlockingQueue<Event> queue;
final Object mutex;
public DrainDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
this.queue = eventQueue;
this.mutex = this;
}
/**
* Busy loop waiting for all queued events to drain.
*/
public void await() {
while (!drained) {
while (!isDrained()) {
Thread.yield();
}
}
@Override
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
synchronized (mutex) {
// !drained if dispatch queued new events on this dispatcher
drained = queue.isEmpty();
}
Event event;
try {
event = queue.take();
} catch(InterruptedException ie) {
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
@Override
public EventHandler getEventHandler() {
final EventHandler actual = super.getEventHandler();
return new EventHandler() {
@Override
public void handle(Event event) {
synchronized (mutex) {
actual.handle(event);
drained = false;
}
}
};
}
}