YARN-8995. Log events info in AsyncDispatcher when event queue size cumulatively reaches a certain number every time. Contributed by zhuqi.

This commit is contained in:
Tao Yang 2019-09-05 16:20:05 +08:00 committed by Tao Yang
parent 84a9c3f999
commit 6f9764076a
4 changed files with 123 additions and 0 deletions

View File

@ -2360,6 +2360,20 @@ public class YarnConfiguration extends Configuration {
public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000; public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;
/**
* The threshold used to trigger the logging of event types and counts
* in RM's main event dispatcher. Default value is 5000,
* which means RM will print events info when the queue size cumulatively
* reaches 5000 every time. Such info can be used to reveal what
* kind of events that RM is stuck at processing mostly,
* it can help to narrow down certain performance issues.
*/
public static final String
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD =
YARN_PREFIX + "dispatcher.print-events-info.threshold";
public static final int
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD = 5000;
/** /**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries * entries

View File

@ -24,11 +24,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -50,8 +52,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
private final BlockingQueue<Event> eventQueue; private final BlockingQueue<Event> eventQueue;
private volatile int lastEventQueueSizeLogged = 0; private volatile int lastEventQueueSizeLogged = 0;
private volatile int lastEventDetailsQueueSizeLogged = 0;
private volatile boolean stopped = false; private volatile boolean stopped = false;
//Configuration for control the details queue event printing.
private int detailsInterval;
private boolean printTrigger = false;
// Configuration flag for enabling/disabling draining dispatcher's events on // Configuration flag for enabling/disabling draining dispatcher's events on
// stop functionality. // stop functionality.
private volatile boolean drainEventsOnStop = false; private volatile boolean drainEventsOnStop = false;
@ -124,6 +131,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
} }
if (event != null) { if (event != null) {
dispatch(event); dispatch(event);
if (printTrigger) {
//Log the latest dispatch event type
// may cause the too many events queued
LOG.info("Latest dispatch event type: " + event.getType());
printTrigger = false;
}
} }
} }
} }
@ -135,6 +148,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
exitOnDispatchException = false; exitOnDispatchException = false;
} }
@Override
protected void serviceInit(Configuration conf) throws Exception{
super.serviceInit(conf);
this.detailsInterval = getConfig().getInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
YarnConfiguration.
DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
}
@Override @Override
protected void serviceStart() throws Exception { protected void serviceStart() throws Exception {
//start all the components //start all the components
@ -243,6 +265,17 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
} }
class GenericEventHandler implements EventHandler<Event> { class GenericEventHandler implements EventHandler<Event> {
private void printEventQueueDetails(BlockingQueue<Event> queue) {
Map<Enum, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(e -> e.getType(), Collectors.counting())
);
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
long num = entry.getValue();
LOG.info("Event type: " + entry.getKey()
+ ", Event record counter: " + num);
}
}
public void handle(Event event) { public void handle(Event event) {
if (blockNewEvents) { if (blockNewEvents) {
return; return;
@ -256,6 +289,12 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
lastEventQueueSizeLogged = qSize; lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize); LOG.info("Size of event-queue is " + qSize);
} }
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails(eventQueue);
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity(); int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) { if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: " LOG.warn("Very low remaining capacity in the event-queue: "

View File

@ -107,6 +107,19 @@
<value>300000</value> <value>300000</value>
</property> </property>
<property>
<description>
The threshold used to trigger the logging of event types
and counts in RM's main event dispatcher. Default length is 5000,
which means RM will print events info when the queue size cumulatively
reaches 5000 every time. Such info can be used to reveal what kind of events
that RM is stuck at processing mostly, it can help to
narrow down certain performance issues.
</description>
<name>yarn.dispatcher.print-events-info.threshold</name>
<value>5000</value>
</property>
<property> <property>
<description>The expiry interval for application master reporting.</description> <description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name> <name>yarn.am.liveness-monitor.expiry-interval-ms</name>

View File

@ -18,9 +18,12 @@
package org.apache.hadoop.yarn.event; package org.apache.hadoop.yarn.event;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -93,6 +96,20 @@ public class TestAsyncDispatcher {
DUMMY DUMMY
} }
private static class TestHandler implements EventHandler<Event> {
@Override
public void handle(Event event) {
try {
// As long as 10000 events queued
Thread.sleep(1500);
} catch (InterruptedException e) {}
}
}
private enum TestEnum {
TestEventType
}
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
private void dispatchDummyEvents(Dispatcher disp, int count) { private void dispatchDummyEvents(Dispatcher disp, int count) {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -119,5 +136,45 @@ public class TestAsyncDispatcher {
disp.close(); disp.close();
assertEquals(0, queue.size()); assertEquals(0, queue.size());
} }
//Test print dispatcher details when the blocking queue is heavy
@Test(timeout = 10000)
public void testPrintDispatcherEventDetails() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 5000);
Logger log = mock(Logger.class);
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
logger.setAccessible(true);
Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
Object oldLog = logger.get(null);
try {
logger.set(null, log);
dispatcher.register(TestEnum.class, new TestHandler());
dispatcher.start();
for (int i = 0; i < 10000; ++i) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(TestEnum.TestEventType);
dispatcher.getEventHandler().handle(event);
}
verify(log, atLeastOnce()).info("Event type: TestEventType, " +
"Event record counter: 5000");
Thread.sleep(2000);
//Make sure more than one event to take
verify(log, atLeastOnce()).
info("Latest dispatch event type: TestEventType");
dispatcher.stop();
} finally {
//... restore logger object
logger.set(null, oldLog);
}
}
} }