YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop is true. Contributed by Varun Saxena.

(cherry picked from commit 4667564162)
This commit is contained in:
Naganarasimha 2016-11-23 08:44:58 +05:30
parent c11e0ef621
commit b96ed43897
3 changed files with 48 additions and 9 deletions

View File

@ -151,7 +151,7 @@ protected void serviceStop() throws Exception {
while (!isDrained() && eventHandlingThread != null
&& eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
waitForDrained.wait(100);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
@ -308,4 +308,8 @@ protected boolean isEventThreadWaiting() {
protected boolean isDrained() {
return drained;
}
protected boolean isStopped() {
return stopped;
}
}

View File

@ -25,7 +25,6 @@
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
private volatile boolean drained = false;
private volatile boolean stopped = false;
private final BlockingQueue<Event> queue;
private final Object mutex;
@ -69,7 +68,7 @@ Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
while (!isStopped() && !Thread.currentThread().isInterrupted()) {
synchronized (mutex) {
// !drained if dispatch queued new events on this dispatcher
drained = queue.isEmpty();
@ -109,10 +108,4 @@ protected boolean isDrained() {
return drained;
}
}
@Override
protected void serviceStop() throws Exception {
stopped = true;
super.serviceStop();
}
}

View File

@ -27,6 +27,7 @@
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
public class TestAsyncDispatcher {
@ -77,5 +78,46 @@ public void testDispatchStopOnTimeout() throws Exception {
disp.waitForEventThreadToWait();
disp.close();
}
@SuppressWarnings("rawtypes")
private static class DummyHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
}
}
private enum DummyType {
DUMMY
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(DummyType.DUMMY);
disp.getEventHandler().handle(event);
}
}
// Test if drain dispatcher drains events on stop.
@SuppressWarnings({ "rawtypes" })
@Test(timeout=10000)
public void testDrainDispatcherDrainEventsOnStop() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000);
BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
DrainDispatcher disp = new DrainDispatcher(queue);
disp.init(conf);
disp.register(DummyType.class, new DummyHandler());
disp.setDrainEventsOnStop();
disp.start();
disp.waitForEventThreadToWait();
dispatchDummyEvents(disp, 2);
disp.close();
assertEquals(0, queue.size());
}
}