MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.
(cherry picked from commit c50d1e54000c76880a041ce5959a2eb23c86bd35)
This commit is contained in:
parent
d4203c9aa2
commit
212c519ad3
|
@ -10,6 +10,9 @@ Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
|
MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
|
||||||
|
|
||||||
|
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
|
||||||
|
mapreduce. (Sidharta Seethana via vvasudev)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.Map;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ExitUtil;
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
|
@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
|
||||||
// make it a daemon thread so that the process can exit even if the task is
|
// make it a daemon thread so that the process can exit even if the task is
|
||||||
// not interruptible
|
// not interruptible
|
||||||
taskRunner =
|
taskRunner =
|
||||||
Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
|
HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
|
||||||
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
|
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
|
||||||
// create and start an event handling thread
|
// create and start an event handling thread
|
||||||
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
|
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
|
@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
|
||||||
tfBuilder.setThreadFactory(backingTf);
|
tfBuilder.setThreadFactory(backingTf);
|
||||||
}
|
}
|
||||||
ThreadFactory tf = tfBuilder.build();
|
ThreadFactory tf = tfBuilder.build();
|
||||||
launcherPool = new ThreadPoolExecutor(5, 5, 1,
|
launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
|
||||||
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
||||||
eventHandlingThread = new Thread(new Runnable() {
|
eventHandlingThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
|
||||||
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
|
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.build();
|
.build();
|
||||||
this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
|
this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
|
||||||
|
|
||||||
// This "this leak" is okay because the retained pointer is in an
|
// This "this leak" is okay because the retained pointer is in an
|
||||||
// instance variable.
|
// instance variable.
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
||||||
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||||
|
@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
|
||||||
"ContainerLauncher #%d").setDaemon(true).build();
|
"ContainerLauncher #%d").setDaemon(true).build();
|
||||||
|
|
||||||
// Start with a default core-pool size of 10 and change it dynamically.
|
// Start with a default core-pool size of 10 and change it dynamically.
|
||||||
launcherPool = new ThreadPoolExecutor(initialPoolSize,
|
launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
|
||||||
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
|
||||||
new LinkedBlockingQueue<Runnable>(),
|
new LinkedBlockingQueue<Runnable>(),
|
||||||
tf);
|
tf);
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.Map.Entry;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||||
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
|
||||||
ThreadFactory tf = new ThreadFactoryBuilder()
|
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
|
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
|
||||||
.build();
|
.build();
|
||||||
exec = Executors.newCachedThreadPool(tf);
|
exec = HadoopExecutors.newCachedThreadPool(tf);
|
||||||
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
|
||||||
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
||||||
for (LocalResource resource : localResources.values()) {
|
for (LocalResource resource : localResources.values()) {
|
||||||
|
|
|
@ -32,7 +32,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -73,6 +72,7 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
|
||||||
/** Implements MapReduce locally, in-process, for debugging. */
|
/** Implements MapReduce locally, in-process, for debugging. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -427,7 +427,8 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
ThreadFactory tf = new ThreadFactoryBuilder()
|
ThreadFactory tf = new ThreadFactoryBuilder()
|
||||||
.setNameFormat("LocalJobRunner Map Task Executor #%d")
|
.setNameFormat("LocalJobRunner Map Task Executor #%d")
|
||||||
.build();
|
.build();
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
|
ExecutorService executor = HadoopExecutors.newFixedThreadPool(
|
||||||
|
maxMapThreads, tf);
|
||||||
|
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
@ -453,7 +454,8 @@ public class LocalJobRunner implements ClientProtocol {
|
||||||
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
|
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
|
||||||
|
|
||||||
// Create a new executor service to drain the work queue.
|
// Create a new executor service to drain the work queue.
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
|
ExecutorService executor = HadoopExecutors.newFixedThreadPool(
|
||||||
|
maxReduceThreads);
|
||||||
|
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
|
@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility class to fetch block locations for specified Input paths using a
|
* Utility class to fetch block locations for specified Input paths using a
|
||||||
|
@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
|
||||||
IOException {
|
IOException {
|
||||||
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
|
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
|
||||||
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
|
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
|
||||||
rawExec = Executors.newFixedThreadPool(
|
rawExec = HadoopExecutors.newFixedThreadPool(
|
||||||
numThreads,
|
numThreads,
|
||||||
new ThreadFactoryBuilder().setDaemon(true)
|
new ThreadFactoryBuilder().setDaemon(true)
|
||||||
.setNameFormat("GetFileInfo #%d").build());
|
.setNameFormat("GetFileInfo #%d").build());
|
||||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
|
@ -327,22 +328,22 @@ public class TaskLog {
|
||||||
|
|
||||||
public static ScheduledExecutorService createLogSyncer() {
|
public static ScheduledExecutorService createLogSyncer() {
|
||||||
final ScheduledExecutorService scheduler =
|
final ScheduledExecutorService scheduler =
|
||||||
Executors.newSingleThreadScheduledExecutor(
|
HadoopExecutors.newSingleThreadScheduledExecutor(
|
||||||
new ThreadFactory() {
|
new ThreadFactory() {
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||||
t.setDaemon(true);
|
t.setDaemon(true);
|
||||||
t.setName("Thread for syncLogs");
|
t.setName("Thread for syncLogs");
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ShutdownHookManager.get().addShutdownHook(new Runnable() {
|
ShutdownHookManager.get().addShutdownHook(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
TaskLog.syncLogsShutdown(scheduler);
|
TaskLog.syncLogsShutdown(scheduler);
|
||||||
}
|
}
|
||||||
}, 50);
|
}, 50);
|
||||||
scheduler.scheduleWithFixedDelay(
|
scheduler.scheduleWithFixedDelay(
|
||||||
new Runnable() {
|
new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
|
||||||
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
|
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
|
||||||
|
|
||||||
// Creating a threadpool of the configured size to execute the Mapper
|
// Creating a threadpool of the configured size to execute the Mapper
|
||||||
// map method in parallel.
|
// map method in parallel.
|
||||||
executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
|
executorService = new HadoopThreadPoolExecutor(numberOfThreads,
|
||||||
|
numberOfThreads,
|
||||||
0L, TimeUnit.MILLISECONDS,
|
0L, TimeUnit.MILLISECONDS,
|
||||||
new BlockingArrayQueue
|
new BlockingArrayQueue
|
||||||
(numberOfThreads));
|
(numberOfThreads));
|
||||||
|
|
|
@ -25,10 +25,10 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(2);
|
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
|
||||||
try {
|
try {
|
||||||
for (int i = 0; i < taCtx.length; i++) {
|
for (int i = 0; i < taCtx.length; i++) {
|
||||||
final int taskIdx = i;
|
final int taskIdx = i;
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.ShutdownThreadsHelper;
|
import org.apache.hadoop.util.ShutdownThreadsHelper;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
|
||||||
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
|
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
|
||||||
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
|
||||||
"MoveIntermediateToDone Thread #%d").build();
|
"MoveIntermediateToDone Thread #%d").build();
|
||||||
moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
|
moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
|
||||||
1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
|
numMoveThreads, 1, TimeUnit.HOURS,
|
||||||
|
new LinkedBlockingQueue<Runnable>(), tf);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
|
||||||
((Service) storage).start();
|
((Service) storage).start();
|
||||||
}
|
}
|
||||||
|
|
||||||
scheduledExecutor = new ScheduledThreadPoolExecutor(2,
|
scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
|
||||||
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
|
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,6 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
|
@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
selector = new NioServerSocketChannelFactory(
|
selector = new NioServerSocketChannelFactory(
|
||||||
Executors.newCachedThreadPool(bossFactory),
|
HadoopExecutors.newCachedThreadPool(bossFactory),
|
||||||
Executors.newCachedThreadPool(workerFactory),
|
HadoopExecutors.newCachedThreadPool(workerFactory),
|
||||||
maxShuffleThreads);
|
maxShuffleThreads);
|
||||||
super.serviceInit(new Configuration(conf));
|
super.serviceInit(new Configuration(conf));
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,6 @@ import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
|
|
||||||
/** Utility methods */
|
/** Utility methods */
|
||||||
public class Util {
|
public class Util {
|
||||||
|
@ -157,7 +157,8 @@ public class Util {
|
||||||
/** Execute the callables by a number of threads */
|
/** Execute the callables by a number of threads */
|
||||||
public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
|
public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
|
||||||
) throws InterruptedException, ExecutionException {
|
) throws InterruptedException, ExecutionException {
|
||||||
final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
|
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
|
||||||
|
nThreads);
|
||||||
final List<Future<T>> futures = executor.invokeAll(callables);
|
final List<Future<T>> futures = executor.invokeAll(callables);
|
||||||
for(Future<T> f : futures)
|
for(Future<T> f : futures)
|
||||||
f.get();
|
f.get();
|
||||||
|
|
Loading…
Reference in New Issue