HBASE-13700 Change Thrift2 so that number of woker threads can be passed in

This commit is contained in:
Elliott Clark 2015-05-15 16:30:57 -07:00
parent 81024a5b56
commit cecda74db1
1 changed files with 37 additions and 6 deletions

View File

@ -127,6 +127,7 @@ public class ThriftServer {
options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
options.addOption("h", "help", false, "Print help information");
options.addOption(null, "infoport", true, "Port for web UI");
@ -233,11 +234,15 @@ public class ThriftServer {
private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
TProcessor processor, TTransportFactory transportFactory,
int workerThreads,
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
if (workerThreads > 0) {
serverArgs.workerThreads(workerThreads);
}
ExecutorService executorService = createExecutor(
serverArgs.getWorkerThreads(), metrics);
serverArgs.executorService(executorService);
@ -254,18 +259,27 @@ public class ThriftServer {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
tfb.setNameFormat("thrift2-worker-%d");
return new ThreadPoolExecutor(workerThreads, workerThreads,
ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
pool.prestartAllCoreThreads();
return pool;
}
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
TProcessor processor,
TTransportFactory transportFactory,
int workerThreads,
InetSocketAddress inetSocketAddress)
throws TTransportException {
TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
if (workerThreads > 0) {
serverArgs.maxWorkerThreads(workerThreads);
}
return new TThreadPoolServer(serverArgs);
}
@ -298,6 +312,7 @@ public class ThriftServer {
Options options = getOptions();
Configuration conf = HBaseConfiguration.create();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
/**
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
@ -414,6 +429,10 @@ public class ThriftServer {
};
}
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
// check for user-defined info server port setting, if so override the conf
try {
if (cmd.hasOption("infoport")) {
@ -438,11 +457,23 @@ public class ThriftServer {
}
if (nonblocking) {
server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
server = getTNonBlockingServer(protocolFactory,
processor,
transportFactory,
inetSocketAddress);
} else if (hsha) {
server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics);
server = getTHsHaServer(protocolFactory,
processor,
transportFactory,
workerThreads,
inetSocketAddress,
metrics);
} else {
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
server = getTThreadPoolServer(protocolFactory,
processor,
transportFactory,
workerThreads,
inetSocketAddress);
}
final TServer tserver = server;