HBASE-24750 : Adding default UncaughtExceptionHandler for Thread factories (ADDENDUM)
Closes #2231
This commit is contained in:
parent
6fd7dcef28
commit
ea130249ae
|
@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -64,7 +65,8 @@ public class LogRollBackupSubprocedurePool implements Closeable, Abortable {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
|
executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>(),
|
new LinkedBlockingQueue<>(),
|
||||||
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-backup-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.ipc.RpcClientFactory;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
|
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -76,7 +77,8 @@ class AsyncConnectionImpl implements AsyncConnection {
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
|
static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(
|
||||||
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").build(), 10,
|
new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
|
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.util.Addressing;
|
import org.apache.hadoop.hbase.util.Addressing;
|
||||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
|
||||||
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
|
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream;
|
||||||
|
@ -180,7 +181,7 @@ class ClusterStatusListener implements Closeable {
|
||||||
private DatagramChannel channel;
|
private DatagramChannel channel;
|
||||||
private final EventLoopGroup group = new NioEventLoopGroup(1,
|
private final EventLoopGroup group = new NioEventLoopGroup(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("hbase-client-clusterStatusListener-pool-%d")
|
||||||
.build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
public MulticastListener() {
|
public MulticastListener() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.PoolMap;
|
import org.apache.hadoop.hbase.util.PoolMap;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -91,12 +92,14 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
|
public static final Logger LOG = LoggerFactory.getLogger(AbstractRpcClient.class);
|
||||||
|
|
||||||
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
|
protected static final HashedWheelTimer WHEEL_TIMER = new HashedWheelTimer(
|
||||||
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d").build(), 10,
|
new ThreadFactoryBuilder().setNameFormat("RpcClient-timer-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10,
|
||||||
TimeUnit.MILLISECONDS);
|
TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
|
private static final ScheduledExecutorService IDLE_CONN_SWEEPER = Executors
|
||||||
.newScheduledThreadPool(1,
|
.newScheduledThreadPool(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
protected boolean running = true; // if client runs
|
protected boolean running = true; // if client runs
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
|
||||||
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
import org.apache.hadoop.hbase.security.NettyHBaseRpcConnectionHeaderHandler;
|
||||||
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
|
||||||
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -77,8 +78,8 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
|
private static final Logger LOG = LoggerFactory.getLogger(NettyRpcConnection.class);
|
||||||
|
|
||||||
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
|
private static final ScheduledExecutorService RELOGIN_EXECUTOR = Executors
|
||||||
.newSingleThreadScheduledExecutor(
|
.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d")
|
||||||
new ThreadFactoryBuilder().setNameFormat("Relogin-pool-%d").build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
private final NettyRpcClient rpcClient;
|
private final NettyRpcClient rpcClient;
|
||||||
|
|
||||||
|
|
|
@ -29,11 +29,9 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -46,16 +44,9 @@ import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Threads {
|
public class Threads {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Threads.class);
|
||||||
private static final AtomicInteger poolNumber = new AtomicInteger(1);
|
|
||||||
|
|
||||||
public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
|
public static final UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
|
||||||
new UncaughtExceptionHandler() {
|
(t, e) -> LOG.warn("Thread:{} exited with Exception:{}", t, StringUtils.stringifyException(e));
|
||||||
@Override
|
|
||||||
public void uncaughtException(Thread t, Throwable e) {
|
|
||||||
LOG.warn("Thread:" + t + " exited with Exception:"
|
|
||||||
+ StringUtils.stringifyException(e));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utility method that sets name, daemon status and starts passed thread.
|
* Utility method that sets name, daemon status and starts passed thread.
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -130,7 +131,8 @@ public class AsyncClientExample extends Configured implements Tool {
|
||||||
TableName tableName = TableName.valueOf(args[0]);
|
TableName tableName = TableName.valueOf(args[0]);
|
||||||
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
|
int numOps = args.length > 1 ? Integer.parseInt(args[1]) : DEFAULT_NUM_OPS;
|
||||||
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
|
ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE,
|
||||||
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("AsyncClientExample-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
|
// We use AsyncTable here so we need to provide a separated thread pool. RawAsyncTable does not
|
||||||
// need a thread pool and may have a better performance if you use it correctly as it can save
|
// need a thread pool and may have a better performance if you use it correctly as it can save
|
||||||
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad
|
// some context switches. But if you use RawAsyncTable incorrectly, you may have a very bad
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.chaos.policies;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.chaos.actions.Action;
|
import org.apache.hadoop.hbase.chaos.actions.Action;
|
||||||
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -42,7 +43,8 @@ public class TwoConcurrentActionPolicy extends PeriodicPolicy {
|
||||||
this.actionsOne = actionsOne;
|
this.actionsOne = actionsOne;
|
||||||
this.actionsTwo = actionsTwo;
|
this.actionsTwo = actionsTwo;
|
||||||
executor = Executors.newFixedThreadPool(2,
|
executor = Executors.newFixedThreadPool(2,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("TwoConcurrentAction-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -62,7 +63,8 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
|
this.getClass().getSimpleName(), handlerCount, maxQueueLength);
|
||||||
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
||||||
new ArrayBlockingQueue<>(maxQueueLength),
|
new ArrayBlockingQueue<>(maxQueueLength),
|
||||||
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d").build(),
|
new ThreadFactoryBuilder().setNameFormat("FifoRpcScheduler.handler-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
@ -73,12 +74,14 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler {
|
||||||
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
this.executor = new ThreadPoolExecutor(handlerCount, handlerCount, 60, TimeUnit.SECONDS,
|
||||||
new ArrayBlockingQueue<>(maxQueueLength),
|
new ArrayBlockingQueue<>(maxQueueLength),
|
||||||
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.call.handler-pool-%d")
|
||||||
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
this.rsReportExecutor =
|
this.rsReportExecutor =
|
||||||
new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS,
|
new ThreadPoolExecutor(rsReportHandlerCount, rsReportHandlerCount, 60, TimeUnit.SECONDS,
|
||||||
new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
|
new ArrayBlockingQueue<>(rsRsreportMaxQueueLength),
|
||||||
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("MasterFifoRpcScheduler.RSReport.handler-pool-%d")
|
||||||
.build(), new ThreadPoolExecutor.CallerRunsPolicy());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -247,7 +248,7 @@ public class ClusterStatusPublisher extends ScheduledChore {
|
||||||
private DatagramChannel channel;
|
private DatagramChannel channel;
|
||||||
private final EventLoopGroup group = new NioEventLoopGroup(1,
|
private final EventLoopGroup group = new NioEventLoopGroup(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("hbase-master-clusterStatusPublisher-pool-%d")
|
||||||
.build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
public MulticastPublisher() {
|
public MulticastPublisher() {
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
import org.apache.hadoop.hbase.wal.WALSplitUtil;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -679,7 +680,8 @@ public class SplitTableRegionProcedure
|
||||||
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
|
||||||
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
|
||||||
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
|
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
|
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
|
||||||
|
|
||||||
// Split each store file.
|
// Split each store file.
|
||||||
|
|
|
@ -52,7 +52,8 @@ public class DirScanPool implements ConfigurationObserver {
|
||||||
|
|
||||||
private static ThreadPoolExecutor initializePool(int size) {
|
private static ThreadPoolExecutor initializePool(int size) {
|
||||||
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
|
return Threads.getBoundedCachedThreadPool(size, 1, TimeUnit.MINUTES,
|
||||||
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("dir-scan-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,7 @@ import com.lmax.disruptor.dsl.ProducerType;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
|
||||||
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
@ -65,7 +66,7 @@ public class NamedQueueRecorder {
|
||||||
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
|
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
|
||||||
getEventCount(eventCount),
|
getEventCount(eventCount),
|
||||||
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
|
||||||
.build(),
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
|
||||||
ProducerType.MULTI,
|
ProducerType.MULTI,
|
||||||
new BlockingWaitStrategy());
|
new BlockingWaitStrategy());
|
||||||
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
|
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
|
||||||
|
|
|
@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -114,7 +115,7 @@ public class ProcedureCoordinator {
|
||||||
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
||||||
new SynchronousQueue<>(),
|
new SynchronousQueue<>(),
|
||||||
new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
|
||||||
.build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -88,7 +89,7 @@ public class ProcedureMember implements Closeable {
|
||||||
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
|
||||||
new SynchronousQueue<>(),
|
new SynchronousQueue<>(),
|
||||||
new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
|
new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
|
||||||
.build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -228,7 +228,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
|
||||||
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
|
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||||
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -284,7 +284,8 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
|
||||||
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
|
||||||
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
@ -242,7 +243,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
|
||||||
// spinning as other strategies do.
|
// spinning as other strategies do.
|
||||||
this.disruptor = new Disruptor<>(RingBufferTruck::new,
|
this.disruptor = new Disruptor<>(RingBufferTruck::new,
|
||||||
getPreallocatedEventCount(),
|
getPreallocatedEventCount(),
|
||||||
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d").build(),
|
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".append-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
|
||||||
ProducerType.MULTI, new BlockingWaitStrategy());
|
ProducerType.MULTI, new BlockingWaitStrategy());
|
||||||
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
|
||||||
// because SyncFuture.NOT_DONE = 0.
|
// because SyncFuture.NOT_DONE = 0.
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.security.access;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
import org.apache.hadoop.hbase.zookeeper.ZKListener;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
@ -69,7 +70,8 @@ public class ZKPermissionWatcher extends ZKListener implements Closeable {
|
||||||
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
|
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
|
||||||
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
|
this.aclZNode = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, aclZnodeParent);
|
||||||
executor = Executors.newSingleThreadExecutor(
|
executor = Executors.newSingleThreadExecutor(
|
||||||
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("zk-permission-watcher-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws KeeperException {
|
public void start() throws KeeperException {
|
||||||
|
|
|
@ -616,7 +616,8 @@ public final class SnapshotManifest {
|
||||||
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
|
||||||
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
|
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
|
||||||
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||||
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -108,7 +109,8 @@ public class HFileContentValidator extends AbstractHBaseTool {
|
||||||
int availableProcessors = Runtime.getRuntime().availableProcessors();
|
int availableProcessors = Runtime.getRuntime().availableProcessors();
|
||||||
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
|
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
|
||||||
return Executors.newFixedThreadPool(numThreads,
|
return Executors.newFixedThreadPool(numThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("hfile-validator-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -1658,7 +1658,8 @@ public final class FSUtils {
|
||||||
|
|
||||||
// run in multiple threads
|
// run in multiple threads
|
||||||
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
|
final ExecutorService tpe = Executors.newFixedThreadPool(threadPoolSize,
|
||||||
new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("FSRegionQuery-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
try {
|
try {
|
||||||
// ignore all file status items that are not of interest
|
// ignore all file status items that are not of interest
|
||||||
for (FileStatus regionStatus : statusList) {
|
for (FileStatus regionStatus : statusList) {
|
||||||
|
|
|
@ -349,7 +349,8 @@ public class HBaseFsck extends Configured implements Closeable {
|
||||||
private static ExecutorService createThreadPool(Configuration conf) {
|
private static ExecutorService createThreadPool(Configuration conf) {
|
||||||
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
int numThreads = conf.getInt("hbasefsck.numthreads", MAX_NUM_THREADS);
|
||||||
return new ScheduledThreadPoolExecutor(numThreads,
|
return new ScheduledThreadPoolExecutor(numThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("hbasefsck-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -233,7 +233,8 @@ public abstract class ModifyRegionUtils {
|
||||||
"hbase.hregion.open.and.init.threads.max", 16));
|
"hbase.hregion.open.and.init.threads.max", 16));
|
||||||
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.
|
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads.
|
||||||
getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
|
||||||
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
return regionOpenAndInitThreadPool;
|
return regionOpenAndInitThreadPool;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -77,7 +77,8 @@ abstract class OutputSink {
|
||||||
this.controller = controller;
|
this.controller = controller;
|
||||||
this.entryBuffers = entryBuffers;
|
this.entryBuffers = entryBuffers;
|
||||||
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
|
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
|
||||||
new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("split-log-closeStream-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
|
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
import org.apache.hadoop.hbase.util.AbstractHBaseTool;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -92,7 +93,8 @@ public class AcidGuaranteesTestTool extends AbstractHBaseTool {
|
||||||
|
|
||||||
ThreadPoolExecutor tpe =
|
ThreadPoolExecutor tpe =
|
||||||
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
|
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
|
||||||
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat(toString() + "-shared-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
tpe.allowCoreThreadTimeOut(true);
|
tpe.allowCoreThreadTimeOut(true);
|
||||||
return tpe;
|
return tpe;
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -138,7 +139,8 @@ public class TestAsyncTableGetMultiThreaded {
|
||||||
int numThreads = 7;
|
int numThreads = 7;
|
||||||
AtomicBoolean stop = new AtomicBoolean(false);
|
AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(numThreads,
|
ExecutorService executor = Executors.newFixedThreadPool(numThreads,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("TestAsyncGet-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
List<Future<?>> futures = new ArrayList<>();
|
List<Future<?>> futures = new ArrayList<>();
|
||||||
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
|
IntStream.range(0, numThreads).forEach(i -> futures.add(executor.submit(() -> {
|
||||||
run(stop);
|
run(stop);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -103,8 +104,8 @@ public class TestOpenTableInCoprocessor {
|
||||||
long keepAliveTime = 60;
|
long keepAliveTime = 60;
|
||||||
ThreadPoolExecutor pool =
|
ThreadPoolExecutor pool =
|
||||||
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
|
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<>(),
|
new SynchronousQueue<>(), new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d")
|
||||||
new ThreadFactoryBuilder().setNameFormat("hbase-table-pool-%d").build());
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
pool.allowCoreThreadTimeOut(true);
|
pool.allowCoreThreadTimeOut(true);
|
||||||
return pool;
|
return pool;
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||||
|
@ -126,7 +127,8 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
|
||||||
public SimpleSubprocedurePool(String name, Configuration conf) {
|
public SimpleSubprocedurePool(String name, Configuration conf) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
executor = Executors.newSingleThreadExecutor(
|
executor = Executors.newSingleThreadExecutor(
|
||||||
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
taskPool = new ExecutorCompletionService<>(executor);
|
taskPool = new ExecutorCompletionService<>(executor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
import org.apache.log4j.Layout;
|
import org.apache.log4j.Layout;
|
||||||
|
@ -233,7 +234,8 @@ public class TestRegionServerReportForDuty {
|
||||||
@Test
|
@Test
|
||||||
public void testReportForDutyWithRSRpcRetry() throws Exception {
|
public void testReportForDutyWithRSRpcRetry() throws Exception {
|
||||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
|
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("RSDelayedStart-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
// Start a master and wait for it to become the active/primary master.
|
// Start a master and wait for it to become the active/primary master.
|
||||||
// Use a random unique port
|
// Use a random unique port
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FutureUtils;
|
import org.apache.hadoop.hbase.util.FutureUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
@ -83,7 +84,8 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
GROUP = new NioEventLoopGroup(1,
|
GROUP = new NioEventLoopGroup(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("TestAsyncFSWAL-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
CHANNEL_CLASS = NioSocketChannel.class;
|
CHANNEL_CLASS = NioSocketChannel.class;
|
||||||
AbstractTestFSWAL.setUpBeforeClass();
|
AbstractTestFSWAL.setUpBeforeClass();
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
@ -52,7 +53,8 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay {
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
GROUP = new NioEventLoopGroup(1,
|
GROUP = new NioEventLoopGroup(1,
|
||||||
new ThreadFactoryBuilder().setNameFormat("TestAsyncWALReplay-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("TestAsyncWALReplay-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
CHANNEL_CLASS = NioSocketChannel.class;
|
CHANNEL_CLASS = NioSocketChannel.class;
|
||||||
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
Configuration conf = AbstractTestWALReplay.TEST_UTIL.getConfiguration();
|
||||||
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
conf.set(WALFactory.WAL_PROVIDER, "asyncfs");
|
||||||
|
|
|
@ -67,7 +67,8 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
|
||||||
|
|
||||||
tableExecutorService =
|
tableExecutorService =
|
||||||
new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
|
new ThreadPoolExecutor(1, POOL_SIZE, 60, TimeUnit.SECONDS, new SynchronousQueue<>(),
|
||||||
new ThreadFactoryBuilder().setNameFormat("testhbck-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("testhbck-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);
|
hbfsckExecutorService = new ScheduledThreadPoolExecutor(POOL_SIZE);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.CellUtil;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
|
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -151,7 +152,8 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
this.handler = hand;
|
this.handler = hand;
|
||||||
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
|
||||||
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
|
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
|
||||||
new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("IncrementCoalescer-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
MBeans.register("thrift", "Thrift", this);
|
MBeans.register("thrift", "Thrift", this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.security.Superusers;
|
import org.apache.hadoop.hbase.security.Superusers;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -96,7 +97,8 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
|
||||||
// It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
|
// It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the
|
||||||
// requests using a single while loop and hence there is no performance degradation.
|
// requests using a single while loop and hence there is no performance degradation.
|
||||||
private final ExecutorService zkEventProcessor = Executors.newSingleThreadExecutor(
|
private final ExecutorService zkEventProcessor = Executors.newSingleThreadExecutor(
|
||||||
new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d").build());
|
new ThreadFactoryBuilder().setNameFormat("zk-event-processor-pool-%d")
|
||||||
|
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue