From 3a1e101dcd4cf0e7149c0d8b1c13f96adf04278e Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Fri, 15 May 2015 16:30:57 -0700 Subject: [PATCH] HBASE-13700 Change Thrift2 so that number of woker threads can be passed in --- .../hadoop/hbase/thrift2/ThriftServer.java | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index 72e911758fe..66ecc18eeae 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -127,6 +127,7 @@ public class ThriftServer { options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]"); options.addOption("f", "framed", false, "Use framed transport"); options.addOption("c", "compact", false, "Use the compact protocol"); + options.addOption("w", "workers", true, "How many worker threads to use."); options.addOption("h", "help", false, "Print help information"); options.addOption(null, "infoport", true, "Port for web UI"); @@ -233,11 +234,15 @@ public class ThriftServer { private static TServer getTHsHaServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, + int workerThreads, InetSocketAddress inetSocketAddress, ThriftMetrics metrics) throws TTransportException { TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); + if (workerThreads > 0) { + serverArgs.workerThreads(workerThreads); + } ExecutorService executorService = createExecutor( serverArgs.getWorkerThreads(), metrics); serverArgs.executorService(executorService); @@ -254,18 +259,27 @@ public class ThriftServer { ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); tfb.setNameFormat("thrift2-worker-%d"); - return new ThreadPoolExecutor(workerThreads, workerThreads, + ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads, Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build()); + pool.prestartAllCoreThreads(); + return pool; } - private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, - TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException { + private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, + TProcessor processor, + TTransportFactory transportFactory, + int workerThreads, + InetSocketAddress inetSocketAddress) + throws TTransportException { TServerTransport serverTransport = new TServerSocket(inetSocketAddress); log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); serverArgs.protocolFactory(protocolFactory); + if (workerThreads > 0) { + serverArgs.maxWorkerThreads(workerThreads); + } return new TThreadPoolServer(serverArgs); } @@ -298,6 +312,7 @@ public class ThriftServer { Options options = getOptions(); Configuration conf = HBaseConfiguration.create(); CommandLine cmd = parseArguments(conf, options, args); + int workerThreads = 0; /** * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase @@ -414,6 +429,10 @@ public class ThriftServer { }; } + if (cmd.hasOption("w")) { + workerThreads = Integer.parseInt(cmd.getOptionValue("w")); + } + // check for user-defined info server port setting, if so override the conf try { if (cmd.hasOption("infoport")) { @@ -438,11 +457,23 @@ public class ThriftServer { } if (nonblocking) { - server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress); + server = getTNonBlockingServer(protocolFactory, + processor, + transportFactory, + inetSocketAddress); } else if (hsha) { - server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress, metrics); + server = getTHsHaServer(protocolFactory, + processor, + transportFactory, + workerThreads, + inetSocketAddress, + metrics); } else { - server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress); + server = getTThreadPoolServer(protocolFactory, + processor, + transportFactory, + workerThreads, + inetSocketAddress); } final TServer tserver = server;