YARN-5436. Race in AsyncDispatcher can cause random test failures in Tez (probably YARN also). (Zhiyuan Yang via gtcarrera9)

This commit is contained in:
Li Lu 2016-07-28 16:50:57 -07:00
parent d9aae22fdf
commit 7086fc72ee
2 changed files with 55 additions and 6 deletions

View File

@ -59,6 +59,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
// Indicates all the remaining dispatcher's events on stop have been drained
// and processed.
// Race condition happens if dispatcher thread sets drained to true between
// handler setting drained to false and enqueueing event. YARN-3878 decided
// to ignore it because of its tiny impact. Also see YARN-5436.
private volatile boolean drained = true;
private final Object waitForDrained = new Object();
@ -300,9 +303,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
protected boolean isEventThreadWaiting() {
return eventHandlingThread.getState() == Thread.State.WAITING;
}
@VisibleForTesting
protected boolean isDrained() {
return this.drained;
}
}

View File

@ -22,6 +22,10 @@ import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
private volatile boolean drained = false;
private volatile boolean stopped = false;
private final BlockingQueue<Event> queue;
private final Object mutex;
public DrainDispatcher() {
this(new LinkedBlockingQueue<Event>());
@ -29,6 +33,8 @@ public class DrainDispatcher extends AsyncDispatcher {
public DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue);
this.queue = eventQueue;
this.mutex = this;
}
/**
@ -44,8 +50,53 @@ public class DrainDispatcher extends AsyncDispatcher {
* Busy loop waiting for all queued events to drain.
*/
public void await() {
while (!isDrained()) {
while (!drained) {
Thread.yield();
}
}
@Override
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (mutex) {
// !drained if dispatch queued new events on this dispatcher
drained = queue.isEmpty();
}
Event event;
try {
event = queue.take();
} catch (InterruptedException ie) {
return;
}
if (event != null) {
dispatch(event);
}
}
}
};
}
@SuppressWarnings("unchecked")
@Override
public EventHandler getEventHandler() {
final EventHandler actual = super.getEventHandler();
return new EventHandler() {
@Override
public void handle(Event event) {
synchronized (mutex) {
actual.handle(event);
drained = false;
}
}
};
}
@Override
protected void serviceStop() throws Exception {
stopped = true;
super.serviceStop();
}
}