HBASE-24750 : All ExecutorService should use guava ThreadFactoryBuilder

Closes #2196

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Ted Yu <tyu@apache.org>
This commit is contained in:
Viraj Jasani 2020-08-07 20:17:12 +05:30
parent 485e0d2fa4
commit 0b604d921a
No known key found for this signature in database
GPG Key ID: B3D6C0B41C8ADFD5
39 changed files with 130 additions and 182 deletions

View File

@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,10 +62,9 @@ public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT); LogRollRegionServerProcedureManager.BACKUP_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS); int threads = conf.getInt(CONCURENT_BACKUP_TASKS_KEY, DEFAULT_CONCURRENT_BACKUP_TASKS);
this.name = name; this.name = name;
executor = executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").build());
Threads.newDaemonThreadFactory("rs(" + name + ")-backup"));
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -52,8 +52,8 @@ import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils; import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,7 +76,8 @@ class AsyncConnectionImpl implements AsyncConnection {
@VisibleForTesting @VisibleForTesting
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer( static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("Async-Client-Retry-Timer"), 10, TimeUnit.MILLISECONDS); new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").build(), 10,
TimeUnit.MILLISECONDS);
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure"; private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
@ -178,8 +178,9 @@ class ClusterStatusListener implements Closeable {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
class MulticastListener implements Listener { class MulticastListener implements Listener {
private DatagramChannel channel; private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup( private final EventLoopGroup group = new NioEventLoopGroup(1,
1, Threads.newDaemonThreadFactory("hbase-client-clusterStatusListener")); new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
.build());
public MulticastListener() { public MulticastListener() {
} }

View File

@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap; import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -91,10 +91,12 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class); public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer( protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
Threads.newDaemonThreadFactory("RpcClient-timer"), 10, TimeUnit.MILLISECONDS); new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").build(), 10,
TimeUnit.MILLISECONDS);
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
.newScheduledThreadPool(1, Threads.newDaemonThreadFactory("Idle-Rpc-Conn-Sweeper")); .newScheduledThreadPool(1,
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").build());
protected boolean running = true; // if client runs protected boolean running = true; // if client runs

View File

@ -33,8 +33,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler; import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler; import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder; import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,8 +76,9 @@ class NettyRpcConnection extends RpcConnection {
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class); private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
private static final ScheduledExecutorService RELOGIN_EXECUTOR = private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
Executors.newSingleThreadScheduledExecutor(Threads.newDaemonThreadFactory("Relogin")); .newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d").build());
private final NettyRpcClient rpcClient; private final NettyRpcClient rpcClient;

View File

@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -197,75 +198,6 @@ public class Threads {
return boundedCachedThreadPool; return boundedCachedThreadPool;
} }
public static ThreadPoolExecutor getBoundedCachedThreadPool(int maxCachedThread, long timeout,
TimeUnit unit, String prefix) {
return getBoundedCachedThreadPool(maxCachedThread, timeout, unit,
newDaemonThreadFactory(prefix));
}
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
* with a common prefix.
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
public static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
return new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
private final int poolNumber = Threads.poolNumber.getAndIncrement();
final ThreadGroup group = threadGroup;
@Override
public Thread newThread(Runnable r) {
final String name = prefix + "-pool" + poolNumber + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
}
};
}
/**
* Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)},
* without setting the exception handler.
*/
public static ThreadFactory newDaemonThreadFactory(final String prefix) {
return newDaemonThreadFactory(prefix, null);
}
/**
* Get a named {@link ThreadFactory} that just builds daemon threads.
* @param prefix name prefix for all threads created from the factory
* @param handler unhandles exception handler to set for all threads
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
public static ThreadFactory newDaemonThreadFactory(final String prefix,
final UncaughtExceptionHandler handler) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = namedFactory.newThread(r);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
} else {
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
}
if (!t.isDaemon()) {
t.setDaemon(true);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
};
}
/** Sets an UncaughtExceptionHandler for the thread which logs the /** Sets an UncaughtExceptionHandler for the thread which logs the
* Exception stack if the thread dies. * Exception stack if the thread dies.
*/ */
@ -273,7 +205,7 @@ public class Threads {
t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER); t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
} }
private static interface PrintThreadInfoHelper { private interface PrintThreadInfoHelper {
void printThreadInfo(PrintStream stream, String title); void printThreadInfo(PrintStream stream, String title);

View File

@ -34,9 +34,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -130,7 +130,7 @@ public class AsyncClientExample extends Configured implements Tool {
TableName tableName = TableName.valueOf(args[0]); TableName tableName = TableName.valueOf(args[0]);
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS; int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE, ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
Threads.newDaemonThreadFactory("AsyncClientExample")); new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").build());
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not // We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
// need a thread pool and may have a better performance if you use it correctly as it can save // need a thread pool and may have a better performance if you use it correctly as it can save
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad // some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad

View File

@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.chaos.policies;
import org.apache.hadoop.hbase.chaos.actions.Action; import org.apache.hadoop.hbase.chaos.actions.Action;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -42,7 +42,7 @@ public class TwoConcurrentActionPolicy extends PeriodicPolicy {
this.actionsOne = actionsOne; this.actionsOne = actionsOne;
this.actionsTwo = actionsTwo; this.actionsTwo = actionsTwo;
executor = Executors.newFixedThreadPool(2, executor = Executors.newFixedThreadPool(2,
Threads.newDaemonThreadFactory("TwoConcurrentAction")); new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").build());
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -100,8 +101,8 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable
// Create the thread pool that will execute RPCs // Create the thread pool that will execute RPCs
threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
getUncaughtExceptionHandler())); .setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
return true; return true;
} }

View File

@ -25,7 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -60,14 +60,10 @@ public class FifoRpcScheduler extends RpcScheduler {
public void start() { public void start() {
LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}", LOG.info("Using {} as user call queue; handlerCount={}; maxQueueLength={}",
this.getClass().getSimpleName(), handlerCount, maxQueueLength); this.getClass().getSimpleName(), handlerCount, maxQueueLength);
this.executor = new ThreadPoolExecutor( this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
handlerCount, new ArrayBlockingQueue<>(maxQueueLength),
handlerCount, new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").build(),
60, new ThreadPoolExecutor.CallerRunsPolicy());
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(maxQueueLength),
Threads.newDaemonThreadFactory("FifoRpcScheduler.handler"),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override

View File

@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -71,13 +71,14 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler {
this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount, this.getClass().getSimpleName(), handlerCount, maxQueueLength, rsReportHandlerCount,
rsRsreportMaxQueueLength); rsRsreportMaxQueueLength);
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS, this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueLength), new ArrayBlockingQueue<>(maxQueueLength),
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.call.handler"), new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
new ThreadPoolExecutor.CallerRunsPolicy()); .build(), new ThreadPoolExecutor.CallerRunsPolicy());
this.rsReportExecutor = new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, this.rsReportExecutor =
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(rsRsreportMaxQueueLength), new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory("MasterFifoRpcScheduler.RSReport.handler"), new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
new ThreadPoolExecutor.CallerRunsPolicy()); new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override

View File

@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap; import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
@ -245,8 +245,9 @@ public class ClusterStatusPublisher extends ScheduledChore {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public static class MulticastPublisher implements Publisher { public static class MulticastPublisher implements Publisher {
private DatagramChannel channel; private DatagramChannel channel;
private final EventLoopGroup group = new NioEventLoopGroup( private final EventLoopGroup group = new NioEventLoopGroup(1,
1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher")); new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
.build());
public MulticastPublisher() { public MulticastPublisher() {
} }

View File

@ -68,9 +68,9 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.wal.WALSplitUtil;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -679,7 +679,7 @@ public class SplitTableRegionProcedure
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" + LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads); getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads, final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d")); new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles); final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
// Split each store file. // Split each store file.

View File

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -50,7 +51,8 @@ public class DirScanPool implements ConfigurationObserver {
} }
private static ThreadPoolExecutor initializePool(int size) { private static ThreadPoolExecutor initializePool(int size) {
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES, "dir-scan"); return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").build());
} }
/** /**

View File

@ -27,7 +27,7 @@ import com.lmax.disruptor.dsl.ProducerType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
@ -64,7 +64,8 @@ public class NamedQueueRecorder {
// disruptor initialization with BlockingWaitStrategy // disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new, this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
getEventCount(eventCount), getEventCount(eventCount),
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"), new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
.build(),
ProducerType.MULTI, ProducerType.MULTI,
new BlockingWaitStrategy()); new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());

View File

@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -112,8 +112,9 @@ public class ProcedureCoordinator {
public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
long keepAliveMillis) { long keepAliveMillis) {
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
Threads.newDaemonThreadFactory("(" + coordName + ")-proc-coordinator")); new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
.build());
} }
/** /**

View File

@ -27,7 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -86,8 +86,9 @@ public class ProcedureMember implements Closeable {
public static ThreadPoolExecutor defaultPool(String memberName, int procThreads, public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
long keepAliveMillis) { long keepAliveMillis) {
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS, return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
Threads.newDaemonThreadFactory("member: '" + memberName + "' subprocedure")); new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
.build());
} }
/** /**

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -227,7 +228,7 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name; this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
"rs(" + name + ")-flush-proc"); new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d").build());
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -517,8 +518,9 @@ class MemStoreFlusher implements FlushRequester {
} }
synchronized void start(UncaughtExceptionHandler eh) { synchronized void start(UncaughtExceptionHandler eh) {
ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( ThreadFactory flusherThreadFactory = new ThreadFactoryBuilder()
server.getServerName().toShortString() + "-MemStoreFlusher", eh); .setNameFormat(server.getServerName().toShortString() + "-MemStoreFlusher-pool-%d")
.setUncaughtExceptionHandler(eh).build();
for (int i = 0; i < flushHandlers.length; i++) { for (int i = 0; i < flushHandlers.length; i++) {
flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i); flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
flusherThreadFactory.newThread(flushHandlers[i]); flusherThreadFactory.newThread(flushHandlers[i]);

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@ -283,7 +284,7 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name; this.name = name;
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
"rs(" + name + ")-snapshot"); new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d").build());
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
@ -242,9 +241,9 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense // Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do. // spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new, this.disruptor = new Disruptor<>(RingBufferTruck::new,
getPreallocatedEventCount(), getPreallocatedEventCount(),
Threads.newDaemonThreadFactory(hostingThreadName + ".append"), new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d").build(),
ProducerType.MULTI, new BlockingWaitStrategy()); ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0, // Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0. // because SyncFuture.NOT_DONE = 0.
this.disruptor.getRingBuffer().next(); this.disruptor.getRingBuffer().next();

View File

@ -21,11 +21,11 @@ package org.apache.hadoop.hbase.security.access;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -69,7 +69,7 @@ public class ZKPermissionWatcher extends ZKListener implements Closeable {
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE); String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent); this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
executor = Executors.newSingleThreadExecutor( executor = Executors.newSingleThreadExecutor(
Threads.newDaemonThreadFactory("zk-permission-watcher")); new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").build());
} }
public void start() throws KeeperException { public void start() throws KeeperException {

View File

@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -616,7 +616,7 @@ public final class SnapshotManifest {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) { public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8); int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory(name)); new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").build());
} }
/** /**

View File

@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -108,7 +108,7 @@ public class HFileContentValidator extends AbstractHBaseTool {
int availableProcessors = Runtime.getRuntime().availableProcessors(); int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors); int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads, return Executors.newFixedThreadPool(numThreads,
Threads.newDaemonThreadFactory("hfile-validator")); new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d").build());
} }
@Override @Override

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -1657,7 +1658,7 @@ public final class FSUtils {
// run in multiple threads // run in multiple threads
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize, final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
Threads.newDaemonThreadFactory("FSRegionQuery")); new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").build());
try { try {
// ignore all file status items that are not of interest // ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) { for (FileStatus regionStatus : statusList) {

View File

@ -134,6 +134,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -347,7 +348,8 @@ public class HBaseFsck extends Configured implements Closeable {
private static ExecutorService createThreadPool(Configuration conf) { private static ExecutorService createThreadPool(Configuration conf) {
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS); int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
return new ScheduledThreadPoolExecutor(numThreads, Threads.newDaemonThreadFactory("hbasefsck")); return new ScheduledThreadPoolExecutor(numThreads,
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d").build());
} }
/** /**

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -232,7 +233,7 @@ public abstract class ModifyRegionUtils {
"hbase.hregion.open.and.init.threads.max", 16)); "hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads. ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.
getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory(threadNamePrefix)); new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d").build());
return regionOpenAndInitThreadPool; return regionOpenAndInitThreadPool;
} }
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,7 +77,7 @@ abstract class OutputSink {
this.controller = controller; this.controller = controller;
this.entryBuffers = entryBuffers; this.entryBuffers = entryBuffers;
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory("split-log-closeStream-")); new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").build());
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
} }

View File

@ -44,9 +44,9 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.AbstractHBaseTool; import org.apache.hadoop.hbase.util.AbstractHBaseTool;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -90,8 +90,9 @@ public class AcidGuaranteesTestTool extends AbstractHBaseTool {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>( BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(
maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS); maxThreads * HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, ThreadPoolExecutor tpe =
TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(toString() + "-shared")); new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").build());
tpe.allowCoreThreadTimeOut(true); tpe.allowCoreThreadTimeOut(true);
return tpe; return tpe;
} }

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -137,8 +137,8 @@ public class TestAsyncTableGetMultiThreaded {
LOG.info("====== Test started ======"); LOG.info("====== Test started ======");
int numThreads = 7; int numThreads = 7;
AtomicBoolean stop = new AtomicBoolean(false); AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executor = ExecutorService executor = Executors.newFixedThreadPool(numThreads,
Executors.newFixedThreadPool(numThreads, Threads.newDaemonThreadFactory("TestAsyncGet-")); new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").build());
List<Future<?>> futures = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>();
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> { IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
run(stop); run(stop);

View File

@ -42,8 +42,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -102,8 +102,9 @@ public class TestOpenTableInCoprocessor {
int maxThreads = 1; int maxThreads = 1;
long keepAliveTime = 60; long keepAliveTime = 60;
ThreadPoolExecutor pool = ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
new SynchronousQueue<>(), Threads.newDaemonThreadFactory("hbase-table")); new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").build());
pool.allowCoreThreadTimeOut(true); pool.allowCoreThreadTimeOut(true);
return pool; return pool;
} }

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -63,13 +63,9 @@ public class TestRegionStates {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
threadPool = Threads.getBoundedCachedThreadPool(32, 60L, TimeUnit.SECONDS, threadPool = Threads.getBoundedCachedThreadPool(32, 60L, TimeUnit.SECONDS,
Threads.newDaemonThreadFactory("ProcedureDispatcher", new ThreadFactoryBuilder().setNameFormat("ProcedureDispatcher-pool-%d")
new UncaughtExceptionHandler() { .setUncaughtExceptionHandler((t, e) -> LOG.warn("Failed thread " + t.getName(), e))
@Override .build());
public void uncaughtException(Thread t, Throwable e) {
LOG.warn("Failed thread " + t.getName(), e);
}
}));
executorService = new ExecutorCompletionService(threadPool); executorService = new ExecutorCompletionService(threadPool);
} }

View File

@ -24,19 +24,19 @@ import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -115,19 +115,18 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
} }
} }
public class SimpleSubprocedurePool implements Closeable, Abortable { public static class SimpleSubprocedurePool implements Closeable, Abortable {
private final ExecutorCompletionService<Void> taskPool; private final ExecutorCompletionService<Void> taskPool;
private final ThreadPoolExecutor executor; private final ExecutorService executor;
private volatile boolean aborted; private volatile boolean aborted;
private final List<Future<Void>> futures = new ArrayList<>(); private final List<Future<Void>> futures = new ArrayList<>();
private final String name; private final String name;
public SimpleSubprocedurePool(String name, Configuration conf) { public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name; this.name = name;
executor = new ThreadPoolExecutor(1, 1, 500, executor = Executors.newSingleThreadExecutor(
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d").build());
Threads.newDaemonThreadFactory("rs(" + name + ")-procedure"));
taskPool = new ExecutorCompletionService<>(executor); taskPool = new ExecutorCompletionService<>(executor);
} }

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.log4j.Appender; import org.apache.log4j.Appender;
import org.apache.log4j.Layout; import org.apache.log4j.Layout;
import org.apache.log4j.PatternLayout; import org.apache.log4j.PatternLayout;
@ -232,8 +232,8 @@ public class TestRegionServerReportForDuty {
*/ */
@Test @Test
public void testReportForDutyWithRSRpcRetry() throws Exception { public void testReportForDutyWithRSRpcRetry() throws Exception {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("RSDelayedStart")); new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").build());
// Start a master and wait for it to become the active/primary master. // Start a master and wait for it to become the active/primary master.
// Use a random unique port // Use a random unique port

View File

@ -50,11 +50,11 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -82,7 +82,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncFSWAL")); GROUP = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d").build());
CHANNEL_CLASS = NioSocketChannel.class; CHANNEL_CLASS = NioSocketChannel.class;
AbstractTestFSWAL.setUpBeforeClass(); AbstractTestFSWAL.setUpBeforeClass();
} }

View File

@ -25,9 +25,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -51,7 +51,8 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
GROUP = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("TestAsyncWALReplay")); GROUP = new NioEventLoopGroup(1,
new ThreadFactoryBuilder().setNameFormat("TestAsyncWALReplay-pool-%d").build());
CHANNEL_CLASS = NioSocketChannel.class; CHANNEL_CLASS = NioSocketChannel.class;
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration(); Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); conf.set(WALFactory.WAL_PROVIDER, "asyncfs");

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker; import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -64,8 +65,9 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
tableExecutorService = new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS, tableExecutorService =
new SynchronousQueue<>(), Threads.newDaemonThreadFactory("testhbck")); new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("testhbck-pool-%d").build());
hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE); hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);

View File

@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.thrift.generated.TIncrement; import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -150,9 +150,8 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
public IncrementCoalescer(ThriftHBaseServiceHandler hand) { public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
this.handler = hand; this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
TimeUnit.MILLISECONDS, queue, new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").build());
Threads.newDaemonThreadFactory("IncrementCoalescer"));
MBeans.register("thrift", "Thrift", this); MBeans.register("thrift", "Thrift", this);
} }

View File

@ -36,8 +36,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
@ -95,8 +95,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
// and further prevents deadlocks if the process method itself makes other zookeeper calls. // and further prevents deadlocks if the process method itself makes other zookeeper calls.
// It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
// requests using a single while loop and hence there is no performance degradation. // requests using a single while loop and hence there is no performance degradation.
private final ExecutorService zkEventProcessor = private final ExecutorService zkEventProcessor = Executors.newSingleThreadExecutor(
Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor")); new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d").build());
private final Configuration conf; private final Configuration conf;