YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. Contributed by Sidharta Seethana.

(cherry picked from commit fa00d3e205)
This commit is contained in:
Varun Vasudev 2016-02-11 12:06:42 +05:30
parent 05b57c87f9
commit 3a5b39e931
16 changed files with 44 additions and 66 deletions

View File

@ -70,6 +70,9 @@ Release 2.9.0 - UNRELEASED
YARN-4628. Display application priority in yarn top. YARN-4628. Display application priority in yarn top.
(Bibin A Chundatt via vvasudev) (Bibin A Chundatt via vvasudev)
YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN.
(Sidharta Seethana via vvasudev)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -40,7 +41,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
/** /**
@ -137,7 +137,7 @@ public class RequestHedgingRMFailoverProxyProvider<T>
try { try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>(); Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0; int numAttempts = 0;
executor = Executors.newFixedThreadPool(allProxies.size()); executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor); completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) { for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() { Callable<Object> c = new Callable<Object>() {

View File

@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -53,6 +52,7 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert; import org.junit.Assert;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -273,7 +273,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending = Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>(); new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor(); ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName()); new LocalDirAllocator(TestFSDownload.class.getName());
int size = 512; int size = 512;
@ -362,7 +362,7 @@ public class TestFSDownload {
}); });
} }
ExecutorService exec = Executors.newFixedThreadPool(fileCount); ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount);
try { try {
List<Future<Boolean>> futures = exec.invokeAll(tasks); List<Future<Boolean>> futures = exec.invokeAll(tasks);
// files should be public // files should be public
@ -399,7 +399,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending = Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>(); new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor(); ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName()); new LocalDirAllocator(TestFSDownload.class.getName());
int[] sizes = new int[10]; int[] sizes = new int[10];
@ -468,7 +468,7 @@ public class TestFSDownload {
System.out.println("SEED: " + sharedSeed); System.out.println("SEED: " + sharedSeed);
Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>(); Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor(); ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = new LocalDirAllocator( LocalDirAllocator dirs = new LocalDirAllocator(
TestFSDownload.class.getName()); TestFSDownload.class.getName());
@ -619,7 +619,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending = Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>(); new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor(); ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName()); new LocalDirAllocator(TestFSDownload.class.getName());
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
@ -674,7 +674,8 @@ public class TestFSDownload {
files.mkdir(basedir, null, true); files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor(); ExecutorService singleThreadedExec = HadoopExecutors
.newSingleThreadExecutor();
LocalDirAllocator dirs = LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName()); new LocalDirAllocator(TestFSDownload.class.getName());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
@ -53,7 +54,6 @@ import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -109,7 +109,7 @@ public class RegistryAdminService extends RegistryOperationsService {
public RegistryAdminService(String name, public RegistryAdminService(String name,
RegistryBindingSource bindingSource) { RegistryBindingSource bindingSource) {
super(name, bindingSource); super(name, bindingSource);
executor = Executors.newCachedThreadPool( executor = HadoopExecutors.newCachedThreadPool(
new ThreadFactory() { new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(1); private AtomicInteger counter = new AtomicInteger(1);

View File

@ -29,8 +29,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -44,6 +42,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@ -116,12 +115,12 @@ public class DeletionService extends AbstractService {
.setNameFormat("DeletionService #%d") .setNameFormat("DeletionService #%d")
.build(); .build();
if (conf != null) { if (conf != null) {
sched = new DelServiceSchedThreadPoolExecutor( sched = new HadoopScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else { } else {
sched = new DelServiceSchedThreadPoolExecutor( sched = new HadoopScheduledThreadPoolExecutor(
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
} }
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@ -158,34 +157,6 @@ public class DeletionService extends AbstractService {
return getServiceState() == STATE.STOPPED && sched.isTerminated(); return getServiceState() == STATE.STOPPED && sched.isTerminated();
} }
private static class DelServiceSchedThreadPoolExecutor extends
ScheduledThreadPoolExecutor {
public DelServiceSchedThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
protected void afterExecute(Runnable task, Throwable exception) {
if (task instanceof FutureTask<?>) {
FutureTask<?> futureTask = (FutureTask<?>) task;
if (!futureTask.isCancelled()) {
try {
futureTask.get();
} catch (ExecutionException ee) {
exception = ee.getCause();
} catch (InterruptedException ie) {
exception = ie;
}
}
}
if (exception != null) {
LOG.error("Exception during execution of task in DeletionService",
exception);
}
}
}
public static class FileDeletionTask implements Runnable { public static class FileDeletionTask implements Runnable {
public static final int INVALID_TASK_ID = -1; public static final int INVALID_TASK_ID = -1;
private int taskId; private int taskId;

View File

@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -65,7 +65,7 @@ public class ContainersLauncher extends AbstractService
private LocalDirsHandlerService dirsHandler; private LocalDirsHandlerService dirsHandler;
@VisibleForTesting @VisibleForTesting
public ExecutorService containerLauncher = public ExecutorService containerLauncher =
Executors.newCachedThreadPool( HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d") .setNameFormat("ContainersLauncher #%d")
.build()); .build());

View File

@ -34,7 +34,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -52,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SerializedException;
@ -187,7 +187,7 @@ public class ContainerLocalizer {
} }
ExecutorService createDownloadThreadPool() { ExecutorService createDownloadThreadPool() {
return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("ContainerLocalizer Downloader").build()); .setNameFormat("ContainerLocalizer Downloader").build());
} }

View File

@ -43,11 +43,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -75,6 +73,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
@ -194,7 +194,7 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService; this.delService = delService;
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
this.cacheCleanup = new ScheduledThreadPoolExecutor(1, this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup") .setNameFormat("ResourceLocalizationService Cache Cleanup")
.build()); .build());
@ -784,7 +784,7 @@ public class ResourceLocalizationService extends CompositeService
ThreadFactory tf = new ThreadFactoryBuilder() ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("PublicLocalizer #%d") .setNameFormat("PublicLocalizer #%d")
.build(); .build();
return Executors.newFixedThreadPool(nThreads, tf); return HadoopExecutors.newFixedThreadPool(nThreads, tf);
} }

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -71,7 +71,7 @@ public class SharedCacheUploadService extends AbstractService implements
int threadCount = int threadCount =
conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT, conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT); YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
uploaderPool = Executors.newFixedThreadPool(threadCount, uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder(). new ThreadFactoryBuilder().
setNameFormat("Shared cache uploader #%d"). setNameFormat("Shared cache uploader #%d").
build()); build());

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -40,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
@ -113,7 +113,7 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler; this.dirsHandler = dirsHandler;
this.appLogAggregators = this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>(); new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool( this.threadPool = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d") .setNameFormat("LogAggregationService #%d")
.build()); .build());

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -203,7 +204,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
ThreadFactory tf = ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build(); new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched = sched =
new ScheduledThreadPoolExecutor(conf.getInt( new HadoopScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT, YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf); YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched; return sched;

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -1758,7 +1759,7 @@ public class TestNodeStatusUpdater {
final int NUM_THREADS = 10; final int NUM_THREADS = 10;
final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
final ExecutorService threadPool = Executors.newFixedThreadPool( final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
NUM_THREADS); NUM_THREADS);
final AtomicBoolean stop = new AtomicBoolean(false); final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -34,9 +34,9 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -339,7 +339,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
final List<Throwable> exceptions = Collections.synchronizedList( final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>()); new ArrayList<Throwable>());
final ExecutorService threadPool = Executors.newFixedThreadPool( final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
testThreads); testThreads);
try { try {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
@ -80,7 +80,7 @@ public class CleanerService extends CompositeService {
// back-to-back runs // back-to-back runs
ThreadFactory tf = ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
scheduledExecutor = Executors.newScheduledThreadPool(2, tf); scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf);
super.serviceInit(conf); super.serviceInit(conf);
} }

View File

@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
@ -116,7 +116,7 @@ public class InMemorySCMStore extends SCMStore {
ThreadFactory tf = ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore") new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
.build(); .build();
scheduler = Executors.newSingleThreadScheduledExecutor(tf); scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf);
super.serviceInit(conf); super.serviceInit(conf);
} }

View File

@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker; import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
@ -121,7 +122,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
startEmptyStore(); startEmptyStore();
final String key = "key1"; final String key = "key1";
int count = 5; int count = 5;
ExecutorService exec = Executors.newFixedThreadPool(count); ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count); List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -197,7 +198,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
// make concurrent addResourceRef calls (clients) // make concurrent addResourceRef calls (clients)
int count = 5; int count = 5;
ExecutorService exec = Executors.newFixedThreadPool(count); ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count); List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -235,7 +236,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
final String user = "user"; final String user = "user";
final ApplicationId id = createAppId(1, 1L); final ApplicationId id = createAppId(1, 1L);
// add the resource and add the resource ref at the same time // add the resource and add the resource ref at the same time
ExecutorService exec = Executors.newFixedThreadPool(2); ExecutorService exec = HadoopExecutors.newFixedThreadPool(2);
final CountDownLatch start = new CountDownLatch(1); final CountDownLatch start = new CountDownLatch(1);
Callable<String> addKeyTask = new Callable<String>() { Callable<String> addKeyTask = new Callable<String>() {
public String call() throws Exception { public String call() throws Exception {