MAPREDUCE-3187. Add names for various unnamed threads in MR2. (Todd Lipcon and Siddharth Seth via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1184904 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-10-16 19:27:02 +00:00
parent 50cb2771e9
commit 68328ae926
21 changed files with 106 additions and 18 deletions

View File

@ -388,6 +388,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3059. QueueMetrics do not have metrics for aggregate
containers-allocated and aggregate containers-released.
(Devaraj K via mahadev)
MAPREDUCE-3187. Add names for various unnamed threads in MR2.
(Todd Lipcon and Siddharth Seth via mahadev)
OPTIMIZATIONS

View File

@ -71,6 +71,7 @@ public class TaskHeartbeatHandler extends AbstractService {
@Override
public void start() {
lostTaskCheckerThread = new Thread(new PingChecker());
lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker");
lostTaskCheckerThread.start();
super.start();
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* This class is responsible for launching of containers.
*/
@ -100,9 +103,13 @@ public class ContainerLauncherImpl extends AbstractService implements
public void start() {
// Start with a default core-pool size of 10 and change it dynamically.
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("ContainerLauncher #%d")
.build();
launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>());
new LinkedBlockingQueue<Runnable>(),
tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@ -143,6 +150,7 @@ public class ContainerLauncherImpl extends AbstractService implements
}
}
});
eventHandlingThread.setName("ContainerLauncher Event Handler");
eventHandlingThread.start();
super.start();
}

View File

@ -242,6 +242,7 @@ public abstract class RMCommunicator extends AbstractService {
}
}
});
allocatorThread.setName("RMCommunicator Allocator");
allocatorThread.start();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app.taskclean;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -31,6 +32,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
@ -47,8 +50,11 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
}
public void start() {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("TaskCleaner #%d")
.build();
launcherPool = new ThreadPoolExecutor(1, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@ -65,6 +71,7 @@ public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
launcherPool.execute(new EventProcessor(event)); }
}
});
eventHandlingThread.setName("TaskCleaner Event Handler");
eventHandlingThread.start();
super.start();
}

View File

@ -32,6 +32,7 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -64,6 +65,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/*
* Loads and manages the Job history cache.
@ -256,7 +259,9 @@ public class JobHistory extends AbstractService implements HistoryContext {
if (startCleanerService) {
long maxAgeOfHistoryFiles = conf.getLong(
JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
);
long runInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
cleanerScheduledExecutor
@ -594,8 +599,11 @@ public class JobHistory extends AbstractService implements HistoryContext {
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
this.sleepTime = sleepTime;
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("MoveIntermediateToDone Thread #%d")
.build();
moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
running = true;
}

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
@ -103,6 +104,8 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ShuffleHandler extends AbstractService
implements AuxServices.AuxiliaryService {
@ -228,8 +231,16 @@ public class ShuffleHandler extends AbstractService
@Override
public synchronized void init(Configuration conf) {
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
.build();
ThreadFactory workerFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Worker #%d")
.build();
selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory));
super.init(new Configuration(conf));
}

View File

@ -84,6 +84,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
//start all the components
super.start();
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName("AsyncDispatcher event handler");
eventHandlingThread.start();
}

View File

@ -108,6 +108,7 @@ public class CompositeService extends AbstractService {
private CompositeService compositeService;
public CompositeServiceShutdownHook(CompositeService compositeService) {
super("CompositeServiceShutdownHook for " + compositeService.getName());
this.compositeService = compositeService;
}

View File

@ -51,6 +51,7 @@ public abstract class AbstractLivelinessMonitor<O> extends AbstractService {
@Override
public void start() {
checkerThread = new Thread(new PingChecker());
checkerThread.setName("Ping Checker");
checkerThread.start();
super.start();
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.*;
@ -34,6 +35,8 @@ import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class DeletionService extends AbstractService {
static final Log LOG = LogFactory.getLog(DeletionService.class);
private int debugDelay;
@ -70,12 +73,17 @@ public class DeletionService extends AbstractService {
@Override
public void init(Configuration conf) {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
sched = new ScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT));
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT),
tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT);
sched = new ScheduledThreadPoolExecutor(YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT,
tf);
}
sched.setKeepAliveTime(60L, SECONDS);
super.init(conf);

View File

@ -231,7 +231,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void startStatusUpdater() {
new Thread() {
new Thread("Node Status Updater") {
@Override
public void run() {
int lastHeartBeatID = 0;

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The launcher for the containers. This service should be started only after
* the {@link ResourceLocalizationService} is started as it depends on creation
@ -54,7 +56,10 @@ public class ContainersLauncher extends AbstractService
private final ContainerExecutor exec;
private final Dispatcher dispatcher;
private final ExecutorService containerLauncher =
Executors.newCachedThreadPool();
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
.build());
private final Map<ContainerId,RunningContainer> running =
Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());

View File

@ -66,6 +66,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.secu
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ContainerLocalizer {
static final Log LOG = LogFactory.getLog(ContainerLocalizer.class);
@ -178,7 +180,8 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
return Executors.newSingleThreadExecutor();
return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("ContainerLocalizer Downloader").build());
}
Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,

View File

@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileUtil;
@ -110,6 +111,8 @@ import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
implements EventHandler<LocalizationEvent>, LocalizationProtocol {
@ -156,7 +159,10 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService;
this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
}
FileContext getLocalFileContext(Configuration conf) {
@ -532,6 +538,17 @@ public class ResourceLocalizationService extends CompositeService
}
private static ExecutorService createLocalizerExecutor(Configuration conf) {
int nThreads = conf.getInt(
YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("PublicLocalizer #%d")
.build();
return Executors.newFixedThreadPool(nThreads, tf);
}
class PublicLocalizer extends Thread {
static final String PUBCACHE_CTXT = "public.cache.dirs";
@ -547,16 +564,16 @@ public class ResourceLocalizationService extends CompositeService
PublicLocalizer(Configuration conf) {
this(conf, getLocalFileContext(conf),
Executors.newFixedThreadPool(conf.getInt(
YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
createLocalizerExecutor(conf),
new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
}
PublicLocalizer(Configuration conf, FileContext lfs,
ExecutorService threadPool,
Map<Future<Path>,LocalizerResourceRequestEvent> pending,
Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
super("Public Localizer");
this.lfs = lfs;
this.conf = conf;
this.pending = pending;
@ -673,6 +690,7 @@ public class ResourceLocalizationService extends CompositeService
RecordFactoryProvider.getRecordFactory(getConfig());
LocalizerRunner(LocalizerContext context, String localizerId) {
super("LocalizerRunner for " + localizerId);
this.context = context;
this.localizerId = localizerId;
this.pending = new ArrayList<LocalizerResourceRequestEvent>();
@ -863,6 +881,7 @@ public class ResourceLocalizationService extends CompositeService
private final Dispatcher dispatcher;
public CacheCleanup(Dispatcher dispatcher) {
super("CacheCleanup");
this.dispatcher = dispatcher;
}

View File

@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
EventHandler<LogAggregatorEvent> {
@ -70,7 +72,10 @@ public class LogAggregationService extends AbstractService implements
this.deletionService = deletionService;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool();
this.threadPool = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
}
public synchronized void init(Configuration conf) {

View File

@ -309,7 +309,7 @@ public class ContainersMonitorImpl extends AbstractService implements
private class MonitoringThread extends Thread {
public MonitoringThread() {
super("Container Monitor");
}
@Override

View File

@ -20,7 +20,7 @@ log4j.appender.CLA.containerLogDir=${yarn.app.mapreduce.container.log.dir}
log4j.appender.CLA.totalLogFileSize=${yarn.app.mapreduce.container.log.filesize}
log4j.appender.CLA.layout=org.apache.log4j.PatternLayout
log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c: %m%n
#
# Event Counter Appender

View File

@ -250,6 +250,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
super(SchedulerEventDispatcher.class.getName());
this.scheduler = scheduler;
this.eventProcessor = new Thread(new EventProcessor());
this.eventProcessor.setName("ResourceManager Event Processor");
}
@Override

View File

@ -90,6 +90,11 @@ public class ApplicationMasterLauncher extends AbstractService implements
}
private class LauncherThread extends Thread {
public LauncherThread() {
super("ApplicationMaster Launcher");
}
@Override
public void run() {
while (!this.isInterrupted()) {

View File

@ -16,4 +16,4 @@ log4j.rootLogger=info,stdout
log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n