HBASE-13700 Change Thrift2 so that number of woker threads can be passed in

This commit is contained in:
Elliott Clark 2015-05-15 16:30:57 -07:00
parent 1bfe387956
commit 3a1e101dcd
1 changed files with 37 additions and 6 deletions

View File

@ -127,6 +127,7 @@ public class ThriftServer {
options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]"); options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
options.addOption("f", "framed", false, "Use framed transport"); options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
options.addOption("h", "help", false, "Print help information"); options.addOption("h", "help", false, "Print help information");
options.addOption(null, "infoport", true, "Port for web UI"); options.addOption(null, "infoport", true, "Port for web UI");
@ -233,11 +234,15 @@ public class ThriftServer {
private static TServer getTHsHaServer(TProtocolFactory protocolFactory, private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
TProcessor processor, TTransportFactory transportFactory, TProcessor processor, TTransportFactory transportFactory,
int workerThreads,
InetSocketAddress inetSocketAddress, ThriftMetrics metrics) InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
throws TTransportException { throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
if (workerThreads > 0) {
serverArgs.workerThreads(workerThreads);
}
ExecutorService executorService = createExecutor( ExecutorService executorService = createExecutor(
serverArgs.getWorkerThreads(), metrics); serverArgs.getWorkerThreads(), metrics);
serverArgs.executorService(executorService); serverArgs.executorService(executorService);
@ -254,18 +259,27 @@ public class ThriftServer {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true); tfb.setDaemon(true);
tfb.setNameFormat("thrift2-worker-%d"); tfb.setNameFormat("thrift2-worker-%d");
return new ThreadPoolExecutor(workerThreads, workerThreads, ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
pool.prestartAllCoreThreads();
return pool;
} }
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException { TProcessor processor,
TTransportFactory transportFactory,
int workerThreads,
InetSocketAddress inetSocketAddress)
throws TTransportException {
TServerTransport serverTransport = new TServerSocket(inetSocketAddress); TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor); serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory); serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory); serverArgs.protocolFactory(protocolFactory);
if (workerThreads > 0) {
serverArgs.maxWorkerThreads(workerThreads);
}
return new TThreadPoolServer(serverArgs); return new TThreadPoolServer(serverArgs);
} }
@ -298,6 +312,7 @@ public class ThriftServer {
Options options = getOptions(); Options options = getOptions();
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
CommandLine cmd = parseArguments(conf, options, args); CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
/** /**
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
@ -414,6 +429,10 @@ public class ThriftServer {
}; };
} }
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
// check for user-defined info server port setting, if so override the conf // check for user-defined info server port setting, if so override the conf
try { try {
if (cmd.hasOption("infoport")) { if (cmd.hasOption("infoport")) {
@ -438,11 +457,23 @@ public class ThriftServer {
} }
if (nonblocking) { if (nonblocking) {
server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress); server = getTNonBlockingServer(protocolFactory,
processor,
transportFactory,
inetSocketAddress);
} else if (hsha) { } else if (hsha) {
server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics); server = getTHsHaServer(protocolFactory,
processor,
transportFactory,
workerThreads,
inetSocketAddress,
metrics);
} else { } else {
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress); server = getTThreadPoolServer(protocolFactory,
processor,
transportFactory,
workerThreads,
inetSocketAddress);
} }
final TServer tserver = server; final TServer tserver = server;