HBASE-13460. Revise the MetaLookupPool executor-related defaults (introduced in HBASE-13036).
This commit is contained in:
parent
d6926629f9
commit
d314f7d9e0
|
@ -79,6 +79,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -364,7 +365,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (batchPool == null) {
|
if (batchPool == null) {
|
||||||
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
|
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
|
||||||
conf.getInt("hbase.hconnection.threads.core", 256), "-shared-");
|
conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
|
||||||
this.cleanupPool = true;
|
this.cleanupPool = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -372,7 +373,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
return this.batchPool;
|
return this.batchPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint) {
|
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
|
||||||
|
BlockingQueue<Runnable> passedWorkQueue) {
|
||||||
// shared HTable thread executor not yet initialized
|
// shared HTable thread executor not yet initialized
|
||||||
if (maxThreads == 0) {
|
if (maxThreads == 0) {
|
||||||
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
|
@ -381,10 +383,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
|
||||||
}
|
}
|
||||||
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
|
long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
|
||||||
LinkedBlockingQueue<Runnable> workQueue =
|
BlockingQueue<Runnable> workQueue = passedWorkQueue;
|
||||||
|
if (workQueue == null) {
|
||||||
|
workQueue =
|
||||||
new LinkedBlockingQueue<Runnable>(maxThreads *
|
new LinkedBlockingQueue<Runnable>(maxThreads *
|
||||||
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
|
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
|
||||||
|
}
|
||||||
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
|
||||||
coreThreads,
|
coreThreads,
|
||||||
maxThreads,
|
maxThreads,
|
||||||
|
@ -400,14 +405,14 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||||
if (this.metaLookupPool == null) {
|
if (this.metaLookupPool == null) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
if (this.metaLookupPool == null) {
|
if (this.metaLookupPool == null) {
|
||||||
//The meta lookup can happen on replicas of the meta (if the appropriate configs
|
//Some of the threads would be used for meta replicas
|
||||||
//are enabled).In a replicated-meta setup, the number '3' is assumed as the max
|
//To start with, threads.max.core threads can hit the meta (including replicas).
|
||||||
//number of replicas by default (unless it is configured to be of a higher value).
|
//After that, requests will get queued up in the passed queue, and only after
|
||||||
//In a non-replicated-meta setup, only one thread would be active.
|
//the queue is full, a new thread will be started
|
||||||
this.metaLookupPool = getThreadPool(
|
this.metaLookupPool = getThreadPool(
|
||||||
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3),
|
conf.getInt("hbase.hconnection.meta.lookup.threads.max", 128),
|
||||||
conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3),
|
conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 10),
|
||||||
"-metaLookup-shared-");
|
"-metaLookup-shared-", new LinkedBlockingQueue<Runnable>());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue