diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1e55fe383b0..0f1c544a8ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2483,6 +2483,20 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; + /** + * The threshold used to trigger the logging of event types and counts + * in RM's main event dispatcher. Default value is 5000, + * which means RM will print events info when the queue size cumulatively + * reaches 5000 every time. Such info can be used to reveal what + * kind of events that RM is stuck at processing mostly, + * it can help to narrow down certain performance issues. + */ + public static final String + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = + YARN_PREFIX + "dispatcher.print-events-info.threshold"; + public static final int + DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 5019369104d..15168e9041b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +32,7 @@ import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -55,8 +57,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private final BlockingQueue eventQueue; private volatile int lastEventQueueSizeLogged = 0; + private volatile int lastEventDetailsQueueSizeLogged = 0; private volatile boolean stopped = false; + //Configuration for control the details queue event printing. + private int detailsInterval; + private boolean printTrigger = false; + // Configuration flag for enabling/disabling draining dispatcher's events on // stop functionality. private volatile boolean drainEventsOnStop = false; @@ -129,6 +136,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } if (event != null) { dispatch(event); + if (printTrigger) { + //Log the latest dispatch event type + // may cause the too many events queued + LOG.info("Latest dispatch event type: " + event.getType()); + printTrigger = false; + } } } } @@ -140,6 +153,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { exitOnDispatchException = false; } + @Override + protected void serviceInit(Configuration conf) throws Exception{ + super.serviceInit(conf); + this.detailsInterval = getConfig().getInt(YarnConfiguration. + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, + YarnConfiguration. + DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD); + } + @Override protected void serviceStart() throws Exception { //start all the components @@ -246,6 +268,17 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } class GenericEventHandler implements EventHandler { + private void printEventQueueDetails(BlockingQueue queue) { + Map counterMap = eventQueue.stream(). + collect(Collectors. + groupingBy(e -> e.getType(), Collectors.counting()) + ); + for (Map.Entry entry : counterMap.entrySet()) { + long num = entry.getValue(); + LOG.info("Event type: " + entry.getKey() + + ", Event record counter: " + num); + } + } public void handle(Event event) { if (blockNewEvents) { return; @@ -259,6 +292,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { lastEventQueueSizeLogged = qSize; LOG.info("Size of event-queue is " + qSize); } + if (qSize != 0 && qSize % detailsInterval == 0 + && lastEventDetailsQueueSizeLogged != qSize) { + lastEventDetailsQueueSizeLogged = qSize; + printEventQueueDetails(eventQueue); + printTrigger = true; + } int remCapacity = eventQueue.remainingCapacity(); if (remCapacity < 1000) { LOG.warn("Very low remaining capacity in the event-queue: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 7a672dee6c6..e2be5688a28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -107,6 +107,19 @@ 300000 + + + The threshold used to trigger the logging of event types + and counts in RM's main event dispatcher. Default length is 5000, + which means RM will print events info when the queue size cumulatively + reaches 5000 every time. Such info can be used to reveal what kind of events + that RM is stuck at processing mostly, it can help to + narrow down certain performance issues. + + yarn.dispatcher.print-events-info.threshold + 5000 + + The expiry interval for application master reporting. yarn.am.liveness-monitor.expiry-interval-ms diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 2b9d7455a9d..762e2280ca3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.event; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -93,6 +96,20 @@ public class TestAsyncDispatcher { DUMMY } + private static class TestHandler implements EventHandler { + @Override + public void handle(Event event) { + try { + // As long as 10000 events queued + Thread.sleep(1500); + } catch (InterruptedException e) {} + } + } + + private enum TestEnum { + TestEventType + } + @SuppressWarnings({ "rawtypes", "unchecked" }) private void dispatchDummyEvents(Dispatcher disp, int count) { for (int i = 0; i < count; i++) { @@ -119,5 +136,45 @@ public class TestAsyncDispatcher { disp.close(); assertEquals(0, queue.size()); } + + //Test print dispatcher details when the blocking queue is heavy + @Test(timeout = 10000) + public void testPrintDispatcherEventDetails() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration. + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000); + Logger log = mock(Logger.class); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + + Field logger = AsyncDispatcher.class.getDeclaredField("LOG"); + logger.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL); + Object oldLog = logger.get(null); + + try { + logger.set(null, log); + dispatcher.register(TestEnum.class, new TestHandler()); + dispatcher.start(); + + for (int i = 0; i < 10000; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + verify(log, atLeastOnce()).info("Event type: TestEventType, " + + "Event record counter: 5000"); + Thread.sleep(2000); + //Make sure more than one event to take + verify(log, atLeastOnce()). + info("Latest dispatch event type: TestEventType"); + dispatcher.stop(); + } finally { + //... restore logger object + logger.set(null, oldLog); + } + } }