YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop is true. Contributed by Varun Saxena.

This commit is contained in:
Naganarasimha 2016-11-23 08:44:58 +05:30
parent 2bf9a15e8a
commit 4667564162
3 changed files with 48 additions and 9 deletions

View File

@ -151,7 +151,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
while (!isDrained() && eventHandlingThread != null
&& eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000);
waitForDrained.wait(100);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState());
}
@ -308,4 +308,8 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
protected boolean isDrained() {
return drained;
}
protected boolean isStopped() {
return stopped;
}
}

View File

@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
@SuppressWarnings("rawtypes")
public class DrainDispatcher extends AsyncDispatcher {
private volatile boolean drained = false;
private volatile boolean stopped = false;
private final BlockingQueue<Event> queue;
private final Object mutex;
@ -69,7 +68,7 @@ public class DrainDispatcher extends AsyncDispatcher {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
while (!isStopped() && !Thread.currentThread().isInterrupted()) {
synchronized (mutex) {
// !drained if dispatch queued new events on this dispatcher
drained = queue.isEmpty();
@ -109,10 +108,4 @@ public class DrainDispatcher extends AsyncDispatcher {
return drained;
}
}
@Override
protected void serviceStop() throws Exception {
stopped = true;
super.serviceStop();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
public class TestAsyncDispatcher {
@ -77,5 +78,46 @@ public class TestAsyncDispatcher {
disp.waitForEventThreadToWait();
disp.close();
}
@SuppressWarnings("rawtypes")
private static class DummyHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
}
}
private enum DummyType {
DUMMY
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(DummyType.DUMMY);
disp.getEventHandler().handle(event);
}
}
// Test if drain dispatcher drains events on stop.
@SuppressWarnings({ "rawtypes" })
@Test(timeout=10000)
public void testDrainDispatcherDrainEventsOnStop() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000);
BlockingQueue<Event> queue = new LinkedBlockingQueue<Event>();
DrainDispatcher disp = new DrainDispatcher(queue);
disp.init(conf);
disp.register(DummyType.class, new DummyHandler());
disp.setDrainEventsOnStop();
disp.start();
disp.waitForEventThreadToWait();
dispatchDummyEvents(disp, 2);
disp.close();
assertEquals(0, queue.size());
}
}