HBASE-15784 Misuse core/maxPoolSize of LinkedBlockingQueue in

ThreadPoolExecutor (Jingcheng Du)
This commit is contained in:
Ramkrishna 2016-05-18 12:40:43 +05:30
parent b2b3b1fa4d
commit 7b5d5394c0
12 changed files with 20 additions and 32 deletions

View File

@ -352,8 +352,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (batchPool == null) { if (batchPool == null) {
synchronized (this) { synchronized (this) {
if (batchPool == null) { if (batchPool == null) {
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256), int threads = conf.getInt("hbase.hconnection.threads.max", 256);
conf.getInt("hbase.hconnection.threads.core", 256), "-shared", null); this.batchPool = getThreadPool(threads, threads, "-shared", null);
this.cleanupPool = true; this.cleanupPool = true;
} }
} }
@ -377,6 +377,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
new LinkedBlockingQueue<Runnable>(maxThreads * new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
coreThreads = maxThreads;
} }
ThreadPoolExecutor tpe = new ThreadPoolExecutor( ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads, coreThreads,
@ -397,9 +398,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
//To start with, threads.max.core threads can hit the meta (including replicas). //To start with, threads.max.core threads can hit the meta (including replicas).
//After that, requests will get queued up in the passed queue, and only after //After that, requests will get queued up in the passed queue, and only after
//the queue is full, a new thread will be started //the queue is full, a new thread will be started
int threads = conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128);
this.metaLookupPool = getThreadPool( this.metaLookupPool = getThreadPool(
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128), threads,
conf.getInt("hbase.hconnection.meta.lookup.threads.core", 10), threads,
"-metaLookup-shared-", new LinkedBlockingQueue<Runnable>()); "-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
} }
} }

View File

@ -25,10 +25,6 @@
<name>hbase.defaults.for.version.skip</name> <name>hbase.defaults.for.version.skip</name>
<value>true</value> <value>true</value>
</property> </property>
<property>
<name>hbase.hconnection.meta.lookup.threads.core</name>
<value>4</value>
</property>
<property> <property>
<name>hbase.hconnection.threads.keepalivetime</name> <name>hbase.hconnection.threads.keepalivetime</name>
<value>3</value> <value>3</value>

View File

@ -107,7 +107,7 @@ public class HFileReplicator {
ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("HFileReplicationCallable-%1$d"); builder.setNameFormat("HFileReplicationCallable-%1$d");
this.exec = this.exec =
new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS, new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), builder.build()); new LinkedBlockingQueue<Runnable>(), builder.build());
this.exec.allowCoreThreadTimeOut(true); this.exec.allowCoreThreadTimeOut(true);
this.copiesPerThread = this.copiesPerThread =

View File

@ -235,20 +235,16 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
*/ */
private ExecutorService getDefaultThreadPool(Configuration conf) { private ExecutorService getDefaultThreadPool(Configuration conf) {
int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256); int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
int coreThreads = conf.getInt("hbase.region.replica.replication.threads.core", 16);
if (maxThreads == 0) { if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8; maxThreads = Runtime.getRuntime().availableProcessors() * 8;
} }
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60); long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue = LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads * new LinkedBlockingQueue<Runnable>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor( ThreadPoolExecutor tpe = new ThreadPoolExecutor(
coreThreads, maxThreads,
maxThreads, maxThreads,
keepAliveTime, keepAliveTime,
TimeUnit.SECONDS, TimeUnit.SECONDS,

View File

@ -135,20 +135,16 @@ public class MultiHConnection {
private void createBatchPool(Configuration conf) { private void createBatchPool(Configuration conf) {
// Use the same config for keep alive as in ConnectionImplementation.getBatchPool(); // Use the same config for keep alive as in ConnectionImplementation.getBatchPool();
int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256); int maxThreads = conf.getInt("hbase.multihconnection.threads.max", 256);
int coreThreads = conf.getInt("hbase.multihconnection.threads.core", 256);
if (maxThreads == 0) { if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8; maxThreads = Runtime.getRuntime().availableProcessors() * 8;
} }
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60); long keepAliveTime = conf.getLong("hbase.multihconnection.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue = LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<Runnable>(maxThreads new LinkedBlockingQueue<Runnable>(maxThreads
* conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)); HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = ThreadPoolExecutor tpe =
new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor(maxThreads, maxThreads, keepAliveTime, TimeUnit.SECONDS, workQueue,
Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-")); Threads.newDaemonThreadFactory("MultiHConnection" + "-shared-"));
tpe.allowCoreThreadTimeOut(true); tpe.allowCoreThreadTimeOut(true);
this.batchPool = tpe; this.batchPool = tpe;

View File

@ -57,7 +57,6 @@ public class TestHBaseFsckMOB extends BaseTestHBaseFsck {
conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.htable.threads.max", POOL_SIZE);
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);

View File

@ -109,7 +109,6 @@ public class TestHBaseFsckOneRS extends BaseTestHBaseFsck {
conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.htable.threads.max", POOL_SIZE);
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);

View File

@ -64,7 +64,6 @@ public class TestHBaseFsckReplicas extends BaseTestHBaseFsck {
conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.htable.threads.max", POOL_SIZE);
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(3); TEST_UTIL.startMiniCluster(3);

View File

@ -73,7 +73,6 @@ public class TestHBaseFsckTwoRS extends BaseTestHBaseFsck {
conf.setInt("hbase.htable.threads.max", POOL_SIZE); conf.setInt("hbase.htable.threads.max", POOL_SIZE);
conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE); conf.setInt("hbase.hconnection.threads.max", 2 * POOL_SIZE);
conf.setInt("hbase.hconnection.threads.core", POOL_SIZE);
conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT); conf.setInt("hbase.hbck.close.timeout", 2 * REGION_ONLINE_TIMEOUT);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 8 * REGION_ONLINE_TIMEOUT);
TEST_UTIL.startMiniCluster(2); TEST_UTIL.startMiniCluster(2);

View File

@ -162,10 +162,6 @@
Enable replay sanity checks on procedure tests. Enable replay sanity checks on procedure tests.
</description> </description>
</property> </property>
<property>
<name>hbase.hconnection.meta.lookup.threads.core</name>
<value>4</value>
</property>
<property> <property>
<name>hbase.hconnection.threads.keepalivetime</name> <name>hbase.hconnection.threads.keepalivetime</name>
<value>3</value> <value>3</value>

View File

@ -132,7 +132,7 @@ public class TBoundedThreadPoolServer extends TServer {
} }
/** Executor service for handling client connections */ /** Executor service for handling client connections */
private ExecutorService executorService; private ThreadPoolExecutor executorService;
/** Flag for stopping the server */ /** Flag for stopping the server */
private volatile boolean stopped; private volatile boolean stopped;
@ -142,9 +142,12 @@ public class TBoundedThreadPoolServer extends TServer {
public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) { public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
super(options); super(options);
int minWorkerThreads = options.minWorkerThreads;
int maxWorkerThreads = options.maxWorkerThreads;
if (options.maxQueuedRequests > 0) { if (options.maxQueuedRequests > 0) {
this.callQueue = new CallQueue( this.callQueue = new CallQueue(
new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics); new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
minWorkerThreads = maxWorkerThreads;
} else { } else {
this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics); this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
} }
@ -153,9 +156,10 @@ public class TBoundedThreadPoolServer extends TServer {
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d"); tfb.setNameFormat("thrift-worker-%d");
executorService = executorService =
new ThreadPoolExecutor(options.minWorkerThreads, new ThreadPoolExecutor(minWorkerThreads,
options.maxWorkerThreads, options.threadKeepAliveTimeSec, maxWorkerThreads, options.threadKeepAliveTimeSec,
TimeUnit.SECONDS, this.callQueue, tfb.build()); TimeUnit.SECONDS, this.callQueue, tfb.build());
executorService.allowCoreThreadTimeOut(true);
serverOptions = options; serverOptions = options;
} }

View File

@ -555,7 +555,7 @@ public class ThriftServerRunner implements Runnable {
CallQueue callQueue = CallQueue callQueue =
new CallQueue(new LinkedBlockingQueue<Call>(), metrics); new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
ExecutorService executorService = createExecutor( ExecutorService executorService = createExecutor(
callQueue, serverArgs.getMinWorkerThreads(), serverArgs.getMaxWorkerThreads()); callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
serverArgs.executorService(executorService) serverArgs.executorService(executorService)
.processor(processor) .processor(processor)
.transportFactory(transportFactory) .transportFactory(transportFactory)
@ -620,8 +620,10 @@ public class ThriftServerRunner implements Runnable {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d"); tfb.setNameFormat("thrift-worker-%d");
return new ThreadPoolExecutor(minWorkers, maxWorkers, ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minWorkers, maxWorkers,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
threadPool.allowCoreThreadTimeOut(true);
return threadPool;
} }
private InetAddress getBindAddress(Configuration conf) private InetAddress getBindAddress(Configuration conf)