diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 81448c5589d..acfba27f6dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -20,11 +20,11 @@ package org.apache.hadoop.yarn.event; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -265,11 +265,16 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler { - private void printEventQueueDetails(BlockingQueue queue) { - Map counterMap = eventQueue.stream(). - collect(Collectors. - groupingBy(e -> e.getType(), Collectors.counting()) - ); + private void printEventQueueDetails() { + Iterator iterator = eventQueue.iterator(); + Map counterMap = new HashMap<>(); + while (iterator.hasNext()) { + Enum eventType = iterator.next().getType(); + if (!counterMap.containsKey(eventType)) { + counterMap.put(eventType, 0L); + } + counterMap.put(eventType, counterMap.get(eventType) + 1); + } for (Map.Entry entry : counterMap.entrySet()) { long num = entry.getValue(); LOG.info("Event type: " + entry.getKey() @@ -292,7 +297,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { if (qSize != 0 && qSize % detailsInterval == 0 && lastEventDetailsQueueSizeLogged != qSize) { lastEventDetailsQueueSizeLogged = qSize; - printEventQueueDetails(eventQueue); + printEventQueueDetails(); printTrigger = true; } int remCapacity = eventQueue.remainingCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 53e2d0539c4..88855fca4fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -97,12 +97,23 @@ public class TestAsyncDispatcher { } private static class TestHandler implements EventHandler { + + private long sleepTime = 1500; + + TestHandler() { + } + + TestHandler(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override public void handle(Event event) { try { // As long as 10000 events queued - Thread.sleep(1500); - } catch (InterruptedException e) {} + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } } } @@ -170,11 +181,54 @@ public class TestAsyncDispatcher { //Make sure more than one event to take verify(log, atLeastOnce()). info("Latest dispatch event type: TestEventType"); - dispatcher.stop(); } finally { //... restore logger object logger.set(null, oldLog); + dispatcher.stop(); } } + + //Test print dispatcher details when the blocking queue is heavy + @Test(timeout = 60000) + public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception { + for (int i = 0; i < 5; i++) { + testPrintDispatcherEventDetailsAvoidDeadLoopInternal(); + } + } + + public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration. + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10); + Log log = mock(Log.class); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + + Field logger = AsyncDispatcher.class.getDeclaredField("LOG"); + logger.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL); + Object oldLog = logger.get(null); + + try { + logger.set(null, log); + dispatcher.register(TestEnum.class, new TestHandler(0)); + dispatcher.start(); + + for (int i = 0; i < 10000; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + Thread.sleep(3000); + } finally { + //... restore logger object + logger.set(null, oldLog); + dispatcher.stop(); + } + } + }