YARN-5436. Race in AsyncDispatcher can cause random test failures in Tez (probably YARN also). (Zhiyuan Yang via gtcarrera9)
(cherry picked from commit 7086fc72ee
)
This commit is contained in:
parent
24d464a150
commit
0b5d96abb5
|
@ -59,6 +59,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
|
|
||||||
// Indicates all the remaining dispatcher's events on stop have been drained
|
// Indicates all the remaining dispatcher's events on stop have been drained
|
||||||
// and processed.
|
// and processed.
|
||||||
|
// Race condition happens if dispatcher thread sets drained to true between
|
||||||
|
// handler setting drained to false and enqueueing event. YARN-3878 decided
|
||||||
|
// to ignore it because of its tiny impact. Also see YARN-5436.
|
||||||
private volatile boolean drained = true;
|
private volatile boolean drained = true;
|
||||||
private final Object waitForDrained = new Object();
|
private final Object waitForDrained = new Object();
|
||||||
|
|
||||||
|
@ -300,9 +303,4 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
protected boolean isEventThreadWaiting() {
|
protected boolean isEventThreadWaiting() {
|
||||||
return eventHandlingThread.getState() == Thread.State.WAITING;
|
return eventHandlingThread.getState() == Thread.State.WAITING;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected boolean isDrained() {
|
|
||||||
return this.drained;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,10 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public class DrainDispatcher extends AsyncDispatcher {
|
public class DrainDispatcher extends AsyncDispatcher {
|
||||||
|
private volatile boolean drained = false;
|
||||||
|
private volatile boolean stopped = false;
|
||||||
|
private final BlockingQueue<Event> queue;
|
||||||
|
private final Object mutex;
|
||||||
|
|
||||||
public DrainDispatcher() {
|
public DrainDispatcher() {
|
||||||
this(new LinkedBlockingQueue<Event>());
|
this(new LinkedBlockingQueue<Event>());
|
||||||
|
@ -29,6 +33,8 @@ public class DrainDispatcher extends AsyncDispatcher {
|
||||||
|
|
||||||
public DrainDispatcher(BlockingQueue<Event> eventQueue) {
|
public DrainDispatcher(BlockingQueue<Event> eventQueue) {
|
||||||
super(eventQueue);
|
super(eventQueue);
|
||||||
|
this.queue = eventQueue;
|
||||||
|
this.mutex = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,8 +50,53 @@ public class DrainDispatcher extends AsyncDispatcher {
|
||||||
* Busy loop waiting for all queued events to drain.
|
* Busy loop waiting for all queued events to drain.
|
||||||
*/
|
*/
|
||||||
public void await() {
|
public void await() {
|
||||||
while (!isDrained()) {
|
while (!drained) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Runnable createThread() {
|
||||||
|
return new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||||
|
synchronized (mutex) {
|
||||||
|
// !drained if dispatch queued new events on this dispatcher
|
||||||
|
drained = queue.isEmpty();
|
||||||
|
}
|
||||||
|
Event event;
|
||||||
|
try {
|
||||||
|
event = queue.take();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (event != null) {
|
||||||
|
dispatch(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public EventHandler getEventHandler() {
|
||||||
|
final EventHandler actual = super.getEventHandler();
|
||||||
|
return new EventHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(Event event) {
|
||||||
|
synchronized (mutex) {
|
||||||
|
actual.handle(event);
|
||||||
|
drained = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
stopped = true;
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue