YARN-1121. Addendum patch. Fixed AsyncDispatcher hang issue during stop due to a race condition caused by the previous patch. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1554344 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
50480f892a
commit
460ac8cb50
|
@ -56,6 +56,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
// Indicates all the remaining dispatcher's events on stop have been drained
|
// Indicates all the remaining dispatcher's events on stop have been drained
|
||||||
// and processed.
|
// and processed.
|
||||||
private volatile boolean drained = true;
|
private volatile boolean drained = true;
|
||||||
|
private Object waitForDrained = new Object();
|
||||||
|
|
||||||
// For drainEventsOnStop enabled only, block newly coming events into the
|
// For drainEventsOnStop enabled only, block newly coming events into the
|
||||||
// queue while stopping.
|
// queue while stopping.
|
||||||
|
@ -82,6 +83,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||||
drained = eventQueue.isEmpty();
|
drained = eventQueue.isEmpty();
|
||||||
|
// blockNewEvents is only set when dispatcher is draining to stop,
|
||||||
|
// adding this check is to avoid the overhead of acquiring the lock
|
||||||
|
// and calling notify every time in the normal run of the loop.
|
||||||
|
if (blockNewEvents) {
|
||||||
|
synchronized (waitForDrained) {
|
||||||
|
if (drained) {
|
||||||
|
waitForDrained.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Event event;
|
Event event;
|
||||||
try {
|
try {
|
||||||
event = eventQueue.take();
|
event = eventQueue.take();
|
||||||
|
@ -125,8 +136,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
if (drainEventsOnStop) {
|
if (drainEventsOnStop) {
|
||||||
blockNewEvents = true;
|
blockNewEvents = true;
|
||||||
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
|
||||||
while(!drained) {
|
synchronized (waitForDrained) {
|
||||||
Thread.yield();
|
while (!drained && eventHandlingThread.isAlive()) {
|
||||||
|
waitForDrained.wait(1000);
|
||||||
|
LOG.info("Waiting for AsyncDispatcher to drain.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stopped = true;
|
stopped = true;
|
||||||
|
|
Loading…
Reference in New Issue