From b96ed4389701b0b03d5069f25c67f36affb86fc9 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Wed, 23 Nov 2016 08:44:58 +0530 Subject: [PATCH] YARN-5911. DrainDispatcher does not drain all events on stop even if setDrainEventsOnStop is true. Contributed by Varun Saxena. (cherry picked from commit 466756416214a4bbc77af8a29da1a33e01106864) --- .../hadoop/yarn/event/AsyncDispatcher.java | 6 ++- .../hadoop/yarn/event/DrainDispatcher.java | 9 +--- .../yarn/event/TestAsyncDispatcher.java | 42 +++++++++++++++++++ 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 42a68193a7f..94bfab61ee4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -151,7 +151,7 @@ protected void serviceStop() throws Exception { while (!isDrained() && eventHandlingThread != null && eventHandlingThread.isAlive() && System.currentTimeMillis() < endTime) { - waitForDrained.wait(1000); + waitForDrained.wait(100); LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + eventHandlingThread.getState()); } @@ -308,4 +308,8 @@ protected boolean isEventThreadWaiting() { protected boolean isDrained() { return drained; } + + protected boolean isStopped() { + return stopped; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 1369465c2da..c5ba07222d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -25,7 +25,6 @@ @SuppressWarnings("rawtypes") public class DrainDispatcher extends AsyncDispatcher { private volatile boolean drained = false; - private volatile boolean stopped = false; private final BlockingQueue queue; private final Object mutex; @@ -69,7 +68,7 @@ Runnable createThread() { return new Runnable() { @Override public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { + while (!isStopped() && !Thread.currentThread().isInterrupted()) { synchronized (mutex) { // !drained if dispatch queued new events on this dispatcher drained = queue.isEmpty(); @@ -109,10 +108,4 @@ protected boolean isDrained() { return drained; } } - - @Override - protected void serviceStop() throws Exception { - stopped = true; - super.serviceStop(); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 018096b6621..2b9d7455a9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -27,6 +27,7 @@ import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; public class TestAsyncDispatcher { @@ -77,5 +78,46 @@ public void testDispatchStopOnTimeout() throws Exception { disp.waitForEventThreadToWait(); disp.close(); } + + @SuppressWarnings("rawtypes") + private static class DummyHandler implements EventHandler { + @Override + public void handle(Event event) { + try { + Thread.sleep(500); + } catch (InterruptedException e) {} + } + } + + private enum DummyType { + DUMMY + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void dispatchDummyEvents(Dispatcher disp, int count) { + for (int i = 0; i < count; i++) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(DummyType.DUMMY); + disp.getEventHandler().handle(event); + } + } + + // Test if drain dispatcher drains events on stop. + @SuppressWarnings({ "rawtypes" }) + @Test(timeout=10000) + public void testDrainDispatcherDrainEventsOnStop() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000); + BlockingQueue queue = new LinkedBlockingQueue(); + DrainDispatcher disp = new DrainDispatcher(queue); + disp.init(conf); + disp.register(DummyType.class, new DummyHandler()); + disp.setDrainEventsOnStop(); + disp.start(); + disp.waitForEventThreadToWait(); + dispatchDummyEvents(disp, 2); + disp.close(); + assertEquals(0, queue.size()); + } }