HBASE-25037 Lots of thread pool are changed to non daemon after HBASE-24750 which causes trouble when shutting down (#2407)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Duo Zhang 2020-09-16 21:11:47 +08:00
parent d15074981f
commit 7a3bb8aefe
37 changed files with 133 additions and 130 deletions

View File

@ -74,9 +74,9 @@ class AsyncConnectionImpl implements AsyncConnection {
@VisibleForTesting @VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d") new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
TimeUnit.MILLISECONDS); 10, TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

View File

@ -181,7 +181,7 @@ class ClusterStatusListener implements Closeable {
private DatagramChannel channel; private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1, private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d") new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
public MulticastListener() { public MulticastListener() {
} }

View File

@ -92,13 +92,13 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d") new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10, .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
TimeUnit.MILLISECONDS); 10, TimeUnit.MILLISECONDS);
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors private static final ScheduledExecutorService IDLE_CONN_SWEEPER =
.newScheduledThreadPool(1, Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d") new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
protected boolean running = true; // if client runs protected boolean running = true; // if client runs

View File

@ -79,7 +79,7 @@ class NettyRpcConnection extends RpcConnection {
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d") .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
private final NettyRpcClient rpcClient; private final NettyRpcClient rpcClient;

View File

@ -29,8 +29,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;

View File

@ -131,7 +131,7 @@ public class AsyncClientExample extends Configured implements Tool {
TableName tableName = TableName.valueOf(args[0]); TableName tableName = TableName.valueOf(args[0]);
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS; int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d") new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not // We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
// need a thread pool and may have a better performance if you use it correctly as it can save // need a thread pool and may have a better performance if you use it correctly as it can save

View File

@ -18,15 +18,16 @@
package org.apache.hadoop.hbase.chaos.policies; package org.apache.hadoop.hbase.chaos.policies;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.chaos.actions.Action;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.util.StringUtils;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.hbase.chaos.actions.Action;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Chaos Monkey policy that will run two different actions at the same time. * Chaos Monkey policy that will run two different actions at the same time.
@ -42,7 +43,8 @@ public class TwoConcurrentActionPolicy extends PeriodicPolicy {
this.actionsOne = actionsOne; this.actionsOne = actionsOne;
this.actionsTwo = actionsTwo; this.actionsTwo = actionsTwo;
executor = Executors.newFixedThreadPool(2, executor = Executors.newFixedThreadPool(2,
new DaemonThreadFactory("TwoConcurrentAction-")); new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }
@Override @Override

View File

@ -102,7 +102,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
// Create the thread pool that will execute RPCs // Create the thread pool that will execute RPCs
threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d") new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
.setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build()); .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
return true; return true;
} }

View File

@ -25,11 +25,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
/** /**
@ -60,14 +61,11 @@ public class FifoRpcScheduler extends RpcScheduler {
public void start() { public void start() {
LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
this.getClass().getSimpleName(), handlerCount, maxQueueLength); this.getClass().getSimpleName(), handlerCount, maxQueueLength);
this.executor = new ThreadPoolExecutor( this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
handlerCount, new ArrayBlockingQueue<>(maxQueueLength),
handlerCount, new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").setDaemon(true)
60, .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
TimeUnit.SECONDS, new ThreadPoolExecutor.CallerRunsPolicy());
new ArrayBlockingQueue<>(maxQueueLength),
new DaemonThreadFactory("FifoRpcScheduler.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override

View File

@ -23,14 +23,15 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport * A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport
* requests to independent handlers to avoid these requests block other requests. To use this * requests to independent handlers to avoid these requests block other requests. To use this
@ -71,13 +72,15 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler {
this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount, this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount,
rsRsreportMaxQueueLength); rsRsreportMaxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueLength), new ArrayBlockingQueue<>(maxQueueLength),
new DaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
new ThreadPoolExecutor.CallerRunsPolicy()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength), TimeUnit.SECONDS, new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
new DaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
new ThreadPoolExecutor.CallerRunsPolicy()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override

View File

@ -248,7 +248,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
private DatagramChannel channel; private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup(1, private final EventLoopGroup group = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d") new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
public MulticastPublisher() { public MulticastPublisher() {
} }

View File

@ -680,7 +680,7 @@ public class SplitTableRegionProcedure
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" + LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads); getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads, final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d") new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles); final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);

View File

@ -17,17 +17,17 @@
*/ */
package org.apache.hadoop.hbase.master.cleaner; package org.apache.hadoop.hbase.master.cleaner;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* The thread pool used for scan directories * The thread pool used for scan directories
*/ */
@ -51,10 +51,9 @@ public class DirScanPool implements ConfigurationObserver {
} }
private static ThreadPoolExecutor initializePool(int size) { private static ThreadPoolExecutor initializePool(int size) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, size, 1, TimeUnit.MINUTES, return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(), new DaemonThreadFactory("dir-scan-pool")); new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").setDaemon(true)
executor.allowCoreThreadTimeOut(true); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
return executor;
} }
/** /**

View File

@ -63,12 +63,10 @@ public class NamedQueueRecorder {
int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024); int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
// disruptor initialization with BlockingWaitStrategy // disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new, this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
getEventCount(eventCount),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, ProducerType.MULTI, new BlockingWaitStrategy());
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler // initialize ringbuffer event handler

View File

@ -28,16 +28,16 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* This is the master side of a distributed complex procedure execution. * This is the master side of a distributed complex procedure execution.
@ -112,8 +112,9 @@ public class ProcedureCoordinator {
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
long keepAliveMillis) { long keepAliveMillis) {
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool")); new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }
/** /**

View File

@ -25,14 +25,14 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
@ -86,8 +86,9 @@ public class ProcedureMember implements Closeable {
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads, public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
long keepAliveMillis) { long keepAliveMillis) {
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS, return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool")); new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }
/** /**

View File

@ -27,10 +27,8 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
@ -51,6 +49,9 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/** /**
@ -227,7 +228,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name; this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-")); new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -515,7 +515,7 @@ class MemStoreFlusher implements FlushRequester {
synchronized void start(UncaughtExceptionHandler eh) { synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder() ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d") .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
.setUncaughtExceptionHandler(eh).build(); .setDaemon(true).setUncaughtExceptionHandler(eh).build();
for (int i = 0; i < flushHandlers.length; i++) { for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]); flusherThreadFactory.newThread(flushHandlers[i]);

View File

@ -28,17 +28,11 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
@ -53,12 +47,19 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
/** /**
* This manager class handles the work dealing with snapshots for a {@link HRegionServer}. * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
* <p> * <p>
@ -284,7 +285,8 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name; this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-")); new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -241,10 +241,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
String hostingThreadName = Thread.currentThread().getName(); String hostingThreadName = Thread.currentThread().getName();
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do. // spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new, this.disruptor = new Disruptor<>(RingBufferTruck::new, getPreallocatedEventCount(),
getPreallocatedEventCount(),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d") new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy()); ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0, // Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0. // because SyncFuture.NOT_DONE = 0.

View File

@ -18,19 +18,6 @@
package org.apache.hadoop.hbase.security.access; package org.apache.hadoop.hbase.security.access;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -41,6 +28,20 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/** /**
* Handles synchronization of access control list entries and updates * Handles synchronization of access control list entries and updates
@ -69,7 +70,8 @@ public class ZKPermissionWatcher extends ZKListener implements Closeable {
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
executor = Executors.newSingleThreadExecutor( executor = Executors.newSingleThreadExecutor(
new DaemonThreadFactory("zk-permission-watcher")); new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }
public void start() throws KeeperException { public void start() throws KeeperException {

View File

@ -616,7 +616,7 @@ public final class SnapshotManifest {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d") new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }

View File

@ -109,7 +109,7 @@ public class HFileContentValidator extends AbstractHBaseTool {
int availableProcessors = Runtime.getRuntime().availableProcessors(); int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors); int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads, return Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d") new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }

View File

@ -1638,7 +1638,7 @@ public final class FSUtils {
// run in multiple threads // run in multiple threads
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d") new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
try { try {
// ignore all file status items that are not of interest // ignore all file status items that are not of interest

View File

@ -351,7 +351,7 @@ public class HBaseFsck extends Configured implements Closeable {
private static ExecutorService createThreadPool(Configuration conf) { private static ExecutorService createThreadPool(Configuration conf) {
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS); int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
return new ScheduledThreadPoolExecutor(numThreads, return new ScheduledThreadPoolExecutor(numThreads,
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d") new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
} }

View File

@ -228,13 +228,12 @@ public abstract class ModifyRegionUtils {
* "hbase.hregion.open.and.init.threads.max" property. * "hbase.hregion.open.and.init.threads.max" property.
*/ */
static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf, static ThreadPoolExecutor getRegionOpenAndInitThreadPool(final Configuration conf,
final String threadNamePrefix, int regionNumber) { final String threadNamePrefix, int regionNumber) {
int maxThreads = Math.min(regionNumber, conf.getInt( int maxThreads =
"hbase.hregion.open.and.init.threads.max", 16)); Math.min(regionNumber, conf.getInt("hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads. ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.getBoundedCachedThreadPool(maxThreads,
getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, 30L, TimeUnit.SECONDS, new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d") .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
return regionOpenAndInitThreadPool; return regionOpenAndInitThreadPool;
} }
} }

View File

@ -77,7 +77,7 @@ public abstract class OutputSink {
this.controller = controller; this.controller = controller;
this.entryBuffers = entryBuffers; this.entryBuffers = entryBuffers;
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d") new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
} }

View File

@ -93,7 +93,7 @@ public class AcidGuaranteesTestTool extends AbstractHBaseTool {
ThreadPoolExecutor tpe = ThreadPoolExecutor tpe =
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d") new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
tpe.allowCoreThreadTimeOut(true); tpe.allowCoreThreadTimeOut(true);
return tpe; return tpe;

View File

@ -139,7 +139,7 @@ public class TestAsyncTableGetMultiThreaded {
int numThreads = 7; int numThreads = 7;
AtomicBoolean stop = new AtomicBoolean(false); AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executor = Executors.newFixedThreadPool(numThreads, ExecutorService executor = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d") new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
List<Future<?>> futures = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>();
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {

View File

@ -101,9 +101,9 @@ public class TestOpenTableInCoprocessor {
private ExecutorService getPool() { private ExecutorService getPool() {
int maxThreads = 1; int maxThreads = 1;
long keepAliveTime = 60; long keepAliveTime = 60;
ThreadPoolExecutor pool = ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime,
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, TimeUnit.SECONDS, new SynchronousQueue<>(),
new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d") new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
pool.allowCoreThreadTimeOut(true); pool.allowCoreThreadTimeOut(true);
return pool; return pool;

View File

@ -63,7 +63,7 @@ public class TestRegionStates {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
threadPool = Threads.getBoundedCachedThreadPool(32, 60L, TimeUnit.SECONDS, threadPool = Threads.getBoundedCachedThreadPool(32, 60L, TimeUnit.SECONDS,
new ThreadFactoryBuilder().setNameFormat("ProcedureDispatcher-pool-%d") new ThreadFactoryBuilder().setNameFormat("ProcedureDispatcher-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler((t, e) -> LOG.warn("Failed thread " + t.getName(), e)) .setUncaughtExceptionHandler((t, e) -> LOG.warn("Failed thread " + t.getName(), e))
.build()); .build());
executorService = new ExecutorCompletionService(threadPool); executorService = new ExecutorCompletionService(threadPool);

View File

@ -25,22 +25,22 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
public class SimpleRSProcedureManager extends RegionServerProcedureManager { public class SimpleRSProcedureManager extends RegionServerProcedureManager {
private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class); private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class);
@ -125,9 +125,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
public SimpleSubprocedurePool(String name, Configuration conf) { public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name; this.name = name;
executor = new ThreadPoolExecutor(1, 1, 500, executor = Executors.newSingleThreadExecutor(
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
new DaemonThreadFactory("rs(" + name + ")-procedure-pool-")); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -234,7 +234,7 @@ public class TestRegionServerReportForDuty {
@Test @Test
public void testReportForDutyWithRSRpcRetry() throws Exception { public void testReportForDutyWithRSRpcRetry() throws Exception {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d") new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
// Start a master and wait for it to become the active/primary master. // Start a master and wait for it to become the active/primary master.

View File

@ -83,9 +83,9 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, GROUP =
new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d") new NioEventLoopGroup(1, new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d")
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
CHANNEL_CLASS = NioSocketChannel.class; CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestFSWAL.setUpBeforeClass(); AbstractTestFSWAL.setUpBeforeClass();
} }

View File

@ -53,7 +53,7 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, GROUP = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("TestAsyncWALReplay-pool-%d") new ThreadFactoryBuilder().setNameFormat("TestAsyncWALReplay-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
CHANNEL_CLASS = NioSocketChannel.class; CHANNEL_CLASS = NioSocketChannel.class;
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();

View File

@ -66,10 +66,9 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
tableExecutorService = tableExecutorService = new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS,
new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("testhbck-pool-%d")
new ThreadFactoryBuilder().setNameFormat("testhbck-pool-%d") .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE); hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);

View File

@ -145,7 +145,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
this.handler = hand; this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d") new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
MBeans.register("thrift", "Thrift", this); MBeans.register("thrift", "Thrift", this);
} }