YARN-6015. AsyncDispatcher thread name can be set to improved debugging. Contributed by Ajith S.
This commit is contained in:
parent
3190a4ba44
commit
ea38256fab
|
@ -74,6 +74,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
|
protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
|
||||||
private boolean exitOnDispatchException;
|
private boolean exitOnDispatchException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The thread name for dispatcher.
|
||||||
|
*/
|
||||||
|
private String dispatcherThreadName = "AsyncDispatcher event handler";
|
||||||
|
|
||||||
public AsyncDispatcher() {
|
public AsyncDispatcher() {
|
||||||
this(new LinkedBlockingQueue<Event>());
|
this(new LinkedBlockingQueue<Event>());
|
||||||
}
|
}
|
||||||
|
@ -84,6 +89,15 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
|
this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a name for this dispatcher thread.
|
||||||
|
* @param dispatcherName name of the dispatcher thread
|
||||||
|
*/
|
||||||
|
public AsyncDispatcher(String dispatcherName) {
|
||||||
|
this();
|
||||||
|
dispatcherThreadName = dispatcherName;
|
||||||
|
}
|
||||||
|
|
||||||
Runnable createThread() {
|
Runnable createThread() {
|
||||||
return new Runnable() {
|
return new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -130,7 +144,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
||||||
//start all the components
|
//start all the components
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
eventHandlingThread = new Thread(createThread());
|
eventHandlingThread = new Thread(createThread());
|
||||||
eventHandlingThread.setName("AsyncDispatcher event handler");
|
eventHandlingThread.setName(dispatcherThreadName);
|
||||||
eventHandlingThread.start();
|
eventHandlingThread.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -211,7 +211,7 @@ public class CommonNodeLabelsManager extends AbstractService {
|
||||||
// for UT purpose
|
// for UT purpose
|
||||||
protected void initDispatcher(Configuration conf) {
|
protected void initDispatcher(Configuration conf) {
|
||||||
// create async handler
|
// create async handler
|
||||||
dispatcher = new AsyncDispatcher();
|
dispatcher = new AsyncDispatcher("NodeLabelManager dispatcher");
|
||||||
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
|
AsyncDispatcher asyncDispatcher = (AsyncDispatcher) dispatcher;
|
||||||
asyncDispatcher.init(conf);
|
asyncDispatcher.init(conf);
|
||||||
asyncDispatcher.setDrainEventsOnStop();
|
asyncDispatcher.setDrainEventsOnStop();
|
||||||
|
|
|
@ -311,7 +311,7 @@ public class NodeManager extends CompositeService
|
||||||
addService(del);
|
addService(del);
|
||||||
|
|
||||||
// NodeManager level dispatcher
|
// NodeManager level dispatcher
|
||||||
this.dispatcher = new AsyncDispatcher();
|
this.dispatcher = new AsyncDispatcher("NM Event dispatcher");
|
||||||
|
|
||||||
dirsHandler = new LocalDirsHandlerService(metrics);
|
dirsHandler = new LocalDirsHandlerService(metrics);
|
||||||
nodeHealthChecker =
|
nodeHealthChecker =
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
||||||
this.dirsHandler = dirsHandler;
|
this.dirsHandler = dirsHandler;
|
||||||
|
|
||||||
// ContainerManager level dispatcher.
|
// ContainerManager level dispatcher.
|
||||||
dispatcher = new AsyncDispatcher();
|
dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
|
||||||
this.deletionService = deletionContext;
|
this.deletionService = deletionContext;
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
|
|
||||||
|
|
|
@ -388,7 +388,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Dispatcher createDispatcher() {
|
protected Dispatcher createDispatcher() {
|
||||||
return new AsyncDispatcher();
|
return new AsyncDispatcher("RM Event dispatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ResourceScheduler createScheduler() {
|
protected ResourceScheduler createScheduler() {
|
||||||
|
|
|
@ -355,7 +355,7 @@ public class RMApplicationHistoryWriter extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AsyncDispatcher createDispatcher() {
|
protected AsyncDispatcher createDispatcher() {
|
||||||
return new AsyncDispatcher();
|
return new AsyncDispatcher("RM ApplicationHistory dispatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -634,7 +634,7 @@ public class SystemMetricsPublisher extends CompositeService {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected AsyncDispatcher createDispatcher() {
|
protected AsyncDispatcher createDispatcher() {
|
||||||
return new AsyncDispatcher();
|
return new AsyncDispatcher("RM Timeline dispatcher");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -678,7 +678,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception{
|
protected void serviceInit(Configuration conf) throws Exception{
|
||||||
// create async handler
|
// create async handler
|
||||||
dispatcher = new AsyncDispatcher();
|
dispatcher = new AsyncDispatcher("RM StateStore dispatcher");
|
||||||
dispatcher.init(conf);
|
dispatcher.init(conf);
|
||||||
rmStateStoreEventHandler = new ForwardingEventHandler();
|
rmStateStoreEventHandler = new ForwardingEventHandler();
|
||||||
dispatcher.register(RMStateStoreEventType.class,
|
dispatcher.register(RMStateStoreEventType.class,
|
||||||
|
|
Loading…
Reference in New Issue