MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.

This commit is contained in:
Varun Vasudev 2016-02-18 14:15:08 +05:30
parent c1afac3a98
commit 2440671a11
15 changed files with 53 additions and 39 deletions

View File

@ -310,6 +310,9 @@ Release 2.9.0 - UNRELEASED
MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
mapreduce. (Sidharta Seethana via vvasudev)
OPTIMIZATIONS
BUG FIXES

View File

@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
// make it a daemon thread so that the process can exit even if the task is
// not interruptible
taskRunner =
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
// create and start an event handling thread
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
tfBuilder.setThreadFactory(backingTf);
}
ThreadFactory tf = tfBuilder.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override

View File

@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
.setDaemon(true)
.build();
this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
// This "this leak" is okay because the retained pointer is in an
// instance variable.

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
launcherPool = new ThreadPoolExecutor(initialPoolSize,
launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);

View File

@ -35,7 +35,6 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.ConverterUtils;
@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
.build();
exec = Executors.newCachedThreadPool(tf);
exec = HadoopExecutors.newCachedThreadPool(tf);
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -74,6 +73,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@ -428,7 +428,8 @@ public class LocalJobRunner implements ClientProtocol {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
ExecutorService executor = HadoopExecutors.newFixedThreadPool(
maxMapThreads, tf);
return executor;
}
@ -454,7 +455,8 @@ public class LocalJobRunner implements ClientProtocol {
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
// Create a new executor service to drain the work queue.
ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
ExecutorService executor = HadoopExecutors.newFixedThreadPool(
maxReduceThreads);
return executor;
}

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
/**
* Utility class to fetch block locations for specified Input paths using a
@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
rawExec = Executors.newFixedThreadPool(
rawExec = HadoopExecutors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@ -327,22 +328,22 @@ public class TaskLog {
public static ScheduledExecutorService createLogSyncer() {
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Thread for syncLogs");
return t;
}
});
HadoopExecutors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Thread for syncLogs");
return t;
}
});
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
TaskLog.syncLogsShutdown(scheduler);
}
}, 50);
@Override
public void run() {
TaskLog.syncLogsShutdown(scheduler);
}
}, 50);
scheduler.scheduleWithFixedDelay(
new Runnable() {
@Override

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import java.io.IOException;
import java.util.concurrent.*;
@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
executorService = new HadoopThreadPoolExecutor(numberOfThreads,
numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));

View File

@ -25,10 +25,10 @@ import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
};
}
final ExecutorService executor = Executors.newFixedThreadPool(2);
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownThreadsHelper;
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build();
moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
numMoveThreads, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(), tf);
super.serviceInit(conf);
}

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
((Service) storage).start();
}
scheduledExecutor = new ScheduledThreadPoolExecutor(2,
scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
.build());

View File

@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
.build();
selector = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory),
HadoopExecutors.newCachedThreadPool(bossFactory),
HadoopExecutors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
super.serviceInit(new Configuration(conf));
}

View File

@ -35,7 +35,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Utility methods */
public class Util {
@ -157,7 +157,8 @@ public class Util {
/** Execute the callables by a number of threads */
public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
) throws InterruptedException, ExecutionException {
final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
nThreads);
final List<Future<T>> futures = executor.invokeAll(callables);
for(Future<T> f : futures)
f.get();