From 3a5b39e931f460ceec45a80cfe0036eefa6c3096 Mon Sep 17 00:00:00 2001 From: Varun Vasudev Date: Thu, 11 Feb 2016 12:06:42 +0530 Subject: [PATCH] YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. Contributed by Sidharta Seethana. (cherry picked from commit fa00d3e20560bee412b49e5792595749a247a8ab) --- hadoop-yarn-project/CHANGES.txt | 3 ++ ...RequestHedgingRMFailoverProxyProvider.java | 4 +-- .../hadoop/yarn/util/TestFSDownload.java | 15 ++++---- .../server/services/RegistryAdminService.java | 4 +-- .../server/nodemanager/DeletionService.java | 35 ++----------------- .../launcher/ContainersLauncher.java | 4 +-- .../localizer/ContainerLocalizer.java | 4 +-- .../ResourceLocalizationService.java | 8 ++--- .../sharedcache/SharedCacheUploadService.java | 4 +-- .../logaggregation/LogAggregationService.java | 4 +-- .../loghandler/NonAggregatingLogHandler.java | 3 +- .../nodemanager/TestNodeStatusUpdater.java | 3 +- .../scheduler/fair/TestFSLeafQueue.java | 4 +-- .../sharedcachemanager/CleanerService.java | 4 +-- .../store/InMemorySCMStore.java | 4 +-- .../store/TestInMemorySCMStore.java | 7 ++-- 16 files changed, 44 insertions(+), 66 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ed9665e9280..472c493a554 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -70,6 +70,9 @@ Release 2.9.0 - UNRELEASED YARN-4628. Display application priority in yarn top. (Bibin A Chundatt via vvasudev) + YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. + (Sidharta Seethana via vvasudev) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java index dc8d19b9543..d076599ed7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.retry.MultiException; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -40,7 +41,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -137,7 +137,7 @@ public class RequestHedgingRMFailoverProxyProvider try { Map, ProxyInfo> proxyMap = new HashMap<>(); int numAttempts = 0; - executor = Executors.newFixedThreadPool(allProxies.size()); + executor = HadoopExecutors.newFixedThreadPool(allProxies.size()); completionService = new ExecutorCompletionService<>(executor); for (final ProxyInfo pInfo : allProxies.values()) { Callable c = new Callable() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java index 3597b314715..a2efb6b693a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java @@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +52,7 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -273,7 +273,7 @@ public class TestFSDownload { Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); int size = 512; @@ -362,7 +362,7 @@ public class TestFSDownload { }); } - ExecutorService exec = Executors.newFixedThreadPool(fileCount); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount); try { List> futures = exec.invokeAll(tasks); // files should be public @@ -399,7 +399,7 @@ public class TestFSDownload { Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); int[] sizes = new int[10]; @@ -468,7 +468,7 @@ public class TestFSDownload { System.out.println("SEED: " + sharedSeed); Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator( TestFSDownload.class.getName()); @@ -619,7 +619,7 @@ public class TestFSDownload { Map> pending = new HashMap>(); - ExecutorService exec = Executors.newSingleThreadExecutor(); + ExecutorService exec = HadoopExecutors.newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); for (int i = 0; i < 5; ++i) { @@ -674,7 +674,8 @@ public class TestFSDownload { files.mkdir(basedir, null, true); conf.setStrings(TestFSDownload.class.getName(), basedir.toString()); - ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor(); + ExecutorService singleThreadedExec = HadoopExecutors + .newSingleThreadExecutor(); LocalDirAllocator dirs = new LocalDirAllocator(TestFSDownload.class.getName()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java index 513d7ac0aaf..7a20c248db2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; import org.apache.hadoop.registry.client.types.RegistryPathStatus; import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; @@ -53,7 +54,6 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; @@ -109,7 +109,7 @@ public class RegistryAdminService extends RegistryOperationsService { public RegistryAdminService(String name, RegistryBindingSource bindingSource) { super(name, bindingSource); - executor = Executors.newCachedThreadPool( + executor = HadoopExecutors.newCachedThreadPool( new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java index 2e0cbbf107d..db834b23127 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java @@ -29,8 +29,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -44,6 +42,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext; @@ -116,12 +115,12 @@ public class DeletionService extends AbstractService { .setNameFormat("DeletionService #%d") .build(); if (conf != null) { - sched = new DelServiceSchedThreadPoolExecutor( + sched = new HadoopScheduledThreadPoolExecutor( conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf); debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0); } else { - sched = new DelServiceSchedThreadPoolExecutor( + sched = new HadoopScheduledThreadPoolExecutor( YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf); } sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -158,34 +157,6 @@ public class DeletionService extends AbstractService { return getServiceState() == STATE.STOPPED && sched.isTerminated(); } - private static class DelServiceSchedThreadPoolExecutor extends - ScheduledThreadPoolExecutor { - public DelServiceSchedThreadPoolExecutor(int corePoolSize, - ThreadFactory threadFactory) { - super(corePoolSize, threadFactory); - } - - @Override - protected void afterExecute(Runnable task, Throwable exception) { - if (task instanceof FutureTask) { - FutureTask futureTask = (FutureTask) task; - if (!futureTask.isCancelled()) { - try { - futureTask.get(); - } catch (ExecutionException ee) { - exception = ee.getCause(); - } catch (InterruptedException ie) { - exception = ie; - } - } - } - if (exception != null) { - LOG.error("Exception during execution of task in DeletionService", - exception); - } - } - } - public static class FileDeletionTask implements Runnable { public static final int INVALID_TASK_ID = -1; private int taskId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java index 3a2649eb63c..a34051ce44a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -65,7 +65,7 @@ public class ContainersLauncher extends AbstractService private LocalDirsHandlerService dirsHandler; @VisibleForTesting public ExecutorService containerLauncher = - Executors.newCachedThreadPool( + HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder() .setNameFormat("ContainersLauncher #%d") .build()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index f82f894ca70..927699e99ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -34,7 +34,6 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -52,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -187,7 +187,7 @@ public class ContainerLocalizer { } ExecutorService createDownloadThreadPool() { - return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder() .setNameFormat("ContainerLocalizer Downloader").build()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index c0c2e8e4fbc..b2413add0a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -43,11 +43,9 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -75,6 +73,8 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -194,7 +194,7 @@ public class ResourceLocalizationService extends CompositeService this.delService = delService; this.dirsHandler = dirsHandler; - this.cacheCleanup = new ScheduledThreadPoolExecutor(1, + this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() .setNameFormat("ResourceLocalizationService Cache Cleanup") .build()); @@ -784,7 +784,7 @@ public class ResourceLocalizationService extends CompositeService ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("PublicLocalizer #%d") .build(); - return Executors.newFixedThreadPool(nThreads, tf); + return HadoopExecutors.newFixedThreadPool(nThreads, tf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java index cb11f99c553..16c36eb2ebd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.ipc.YarnRPC; @@ -71,7 +71,7 @@ public class SharedCacheUploadService extends AbstractService implements int threadCount = conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT, YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT); - uploaderPool = Executors.newFixedThreadPool(threadCount, + uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount, new ThreadFactoryBuilder(). setNameFormat("Shared cache uploader #%d"). build()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index f64685da543..64115355ac7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -40,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -113,7 +113,7 @@ public class LogAggregationService extends AbstractService implements this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = Executors.newCachedThreadPool( + this.threadPool = HadoopExecutors.newCachedThreadPool( new ThreadFactoryBuilder() .setNameFormat("LogAggregationService #%d") .build()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 471e994ae52..d42a4e7ce2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -203,7 +204,7 @@ public class NonAggregatingLogHandler extends AbstractService implements ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build(); sched = - new ScheduledThreadPoolExecutor(conf.getInt( + new HadoopScheduledThreadPoolExecutor(conf.getInt( YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT, YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf); return sched; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 9e6868deb6b..0d85057b5d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -62,6 +62,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -1758,7 +1759,7 @@ public class TestNodeStatusUpdater { final int NUM_THREADS = 10; final CountDownLatch allDone = new CountDownLatch(NUM_THREADS); - final ExecutorService threadPool = Executors.newFixedThreadPool( + final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool( NUM_THREADS); final AtomicBoolean stop = new AtomicBoolean(false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 76374102ebf..7daccad5655 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -34,9 +34,9 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -339,7 +339,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase { final List exceptions = Collections.synchronizedList( new ArrayList()); - final ExecutorService threadPool = Executors.newFixedThreadPool( + final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool( testThreads); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java index 6748387465f..60fc3e57eb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.sharedcachemanager; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics; @@ -80,7 +80,7 @@ public class CleanerService extends CompositeService { // back-to-back runs ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build(); - scheduledExecutor = Executors.newScheduledThreadPool(2, tf); + scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf); super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java index d2efb6a14d8..54d736f5372 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java @@ -28,7 +28,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringInterner; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -116,7 +116,7 @@ public class InMemorySCMStore extends SCMStore { ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore") .build(); - scheduler = Executors.newSingleThreadScheduledExecutor(tf); + scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf); super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java index f934dbfda41..6d67ad32fb7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java @@ -45,6 +45,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker; import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker; @@ -121,7 +122,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest { startEmptyStore(); final String key = "key1"; int count = 5; - ExecutorService exec = Executors.newFixedThreadPool(count); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(count); List> futures = new ArrayList>(count); final CountDownLatch start = new CountDownLatch(1); for (int i = 0; i < count; i++) { @@ -197,7 +198,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest { // make concurrent addResourceRef calls (clients) int count = 5; - ExecutorService exec = Executors.newFixedThreadPool(count); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(count); List> futures = new ArrayList>(count); final CountDownLatch start = new CountDownLatch(1); for (int i = 0; i < count; i++) { @@ -235,7 +236,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest { final String user = "user"; final ApplicationId id = createAppId(1, 1L); // add the resource and add the resource ref at the same time - ExecutorService exec = Executors.newFixedThreadPool(2); + ExecutorService exec = HadoopExecutors.newFixedThreadPool(2); final CountDownLatch start = new CountDownLatch(1); Callable addKeyTask = new Callable() { public String call() throws Exception {