YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. Contributed by Sidharta Seethana.

This commit is contained in:
Varun Vasudev 2016-02-11 12:06:42 +05:30
parent 663a80031c
commit fa00d3e205
16 changed files with 44 additions and 66 deletions

View File

@ -128,6 +128,9 @@ Release 2.9.0 - UNRELEASED
YARN-4628. Display application priority in yarn top.
(Bibin A Chundatt via vvasudev)
YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN.
(Sidharta Seethana via vvasudev)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -40,7 +41,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@ -137,7 +137,7 @@ public class RequestHedgingRMFailoverProxyProvider<T>
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
executor = Executors.newFixedThreadPool(allProxies.size());
executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {

View File

@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -53,6 +52,7 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -273,7 +273,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int size = 512;
@ -362,7 +362,7 @@ public class TestFSDownload {
});
}
ExecutorService exec = Executors.newFixedThreadPool(fileCount);
ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount);
try {
List<Future<Boolean>> futures = exec.invokeAll(tasks);
// files should be public
@ -399,7 +399,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int[] sizes = new int[10];
@ -468,7 +468,7 @@ public class TestFSDownload {
System.out.println("SEED: " + sharedSeed);
Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = new LocalDirAllocator(
TestFSDownload.class.getName());
@ -619,7 +619,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
ExecutorService exec = Executors.newSingleThreadExecutor();
ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
for (int i = 0; i < 5; ++i) {
@ -674,7 +674,8 @@ public class TestFSDownload {
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
ExecutorService singleThreadedExec = HadoopExecutors
.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@ -53,7 +54,6 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@ -109,7 +109,7 @@ public class RegistryAdminService extends RegistryOperationsService {
public RegistryAdminService(String name,
RegistryBindingSource bindingSource) {
super(name, bindingSource);
executor = Executors.newCachedThreadPool(
executor = HadoopExecutors.newCachedThreadPool(
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(1);

View File

@ -29,8 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -44,6 +42,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@ -116,12 +115,12 @@ public class DeletionService extends AbstractService {
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
sched = new DelServiceSchedThreadPoolExecutor(
sched = new HadoopScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
sched = new DelServiceSchedThreadPoolExecutor(
sched = new HadoopScheduledThreadPoolExecutor(
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@ -158,34 +157,6 @@ public class DeletionService extends AbstractService {
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
private static class DelServiceSchedThreadPoolExecutor extends
ScheduledThreadPoolExecutor {
public DelServiceSchedThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
protected void afterExecute(Runnable task, Throwable exception) {
if (task instanceof FutureTask<?>) {
FutureTask<?> futureTask = (FutureTask<?>) task;
if (!futureTask.isCancelled()) {
try {
futureTask.get();
} catch (ExecutionException ee) {
exception = ee.getCause();
} catch (InterruptedException ie) {
exception = ie;
}
}
}
if (exception != null) {
LOG.error("Exception during execution of task in DeletionService",
exception);
}
}
}
public static class FileDeletionTask implements Runnable {
public static final int INVALID_TASK_ID = -1;
private int taskId;

View File

@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@ -65,7 +65,7 @@ public class ContainersLauncher extends AbstractService
private LocalDirsHandlerService dirsHandler;
@VisibleForTesting
public ExecutorService containerLauncher =
Executors.newCachedThreadPool(
HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
.build());

View File

@ -34,7 +34,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@ -52,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.SerializedException;
@ -187,7 +187,7 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("ContainerLocalizer Downloader").build());
}

View File

@ -43,11 +43,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -75,6 +73,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@ -194,7 +194,7 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService;
this.dirsHandler = dirsHandler;
this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
@ -784,7 +784,7 @@ public class ResourceLocalizationService extends CompositeService
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("PublicLocalizer #%d")
.build();
return Executors.newFixedThreadPool(nThreads, tf);
return HadoopExecutors.newFixedThreadPool(nThreads, tf);
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -71,7 +71,7 @@ public class SharedCacheUploadService extends AbstractService implements
int threadCount =
conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
uploaderPool = Executors.newFixedThreadPool(threadCount,
uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder().
setNameFormat("Shared cache uploader #%d").
build());

View File

@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@ -40,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -113,7 +113,7 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
this.threadPool = Executors.newCachedThreadPool(
this.threadPool = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -203,7 +204,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
new ScheduledThreadPoolExecutor(conf.getInt(
new HadoopScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -1758,7 +1759,7 @@ public class TestNodeStatusUpdater {
final int NUM_THREADS = 10;
final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
final ExecutorService threadPool = Executors.newFixedThreadPool(
final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
NUM_THREADS);
final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -34,9 +34,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -339,7 +339,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>());
final ExecutorService threadPool = Executors.newFixedThreadPool(
final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
testThreads);
try {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
@ -80,7 +80,7 @@ public class CleanerService extends CompositeService {
// back-to-back runs
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf);
super.serviceInit(conf);
}

View File

@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -116,7 +116,7 @@ public class InMemorySCMStore extends SCMStore {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
.build();
scheduler = Executors.newSingleThreadScheduledExecutor(tf);
scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf);
super.serviceInit(conf);
}

View File

@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
@ -121,7 +122,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
startEmptyStore();
final String key = "key1";
int count = 5;
ExecutorService exec = Executors.newFixedThreadPool(count);
ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@ -197,7 +198,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
// make concurrent addResourceRef calls (clients)
int count = 5;
ExecutorService exec = Executors.newFixedThreadPool(count);
ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@ -235,7 +236,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
final String user = "user";
final ApplicationId id = createAppId(1, 1L);
// add the resource and add the resource ref at the same time
ExecutorService exec = Executors.newFixedThreadPool(2);
ExecutorService exec = HadoopExecutors.newFixedThreadPool(2);
final CountDownLatch start = new CountDownLatch(1);
Callable<String> addKeyTask = new Callable<String>() {
public String call() throws Exception {