HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size
This commit is contained in:
parent
8f9e46a640
commit
0bb18de91c
|
@ -143,6 +143,8 @@ public class ThriftServer extends Configured implements Tool {
|
|||
options.addOption("f", "framed", false, "Use framed transport");
|
||||
options.addOption("c", "compact", false, "Use the compact protocol");
|
||||
options.addOption("w", "workers", true, "How many worker threads to use.");
|
||||
options.addOption("q", "callQueueSize", true,
|
||||
"Max size of request queue (unbounded by default)");
|
||||
options.addOption("h", "help", false, "Print help information");
|
||||
options.addOption(null, "infoport", true, "Port for web UI");
|
||||
options.addOption("t", READ_TIMEOUT_OPTION, true,
|
||||
|
@ -251,7 +253,7 @@ public class ThriftServer extends Configured implements Tool {
|
|||
|
||||
private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
|
||||
TProcessor processor, TTransportFactory transportFactory,
|
||||
int workerThreads,
|
||||
int workerThreads, int maxCallQueueSize,
|
||||
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
|
||||
throws TTransportException {
|
||||
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
|
||||
|
@ -262,7 +264,7 @@ public class ThriftServer extends Configured implements Tool {
|
|||
serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
|
||||
}
|
||||
ExecutorService executorService = createExecutor(
|
||||
workerThreads, metrics);
|
||||
workerThreads, maxCallQueueSize, metrics);
|
||||
serverArgs.executorService(executorService);
|
||||
serverArgs.processor(processor);
|
||||
serverArgs.transportFactory(transportFactory);
|
||||
|
@ -271,9 +273,14 @@ public class ThriftServer extends Configured implements Tool {
|
|||
}
|
||||
|
||||
private static ExecutorService createExecutor(
|
||||
int workerThreads, ThriftMetrics metrics) {
|
||||
CallQueue callQueue = new CallQueue(
|
||||
new LinkedBlockingQueue<Call>(), metrics);
|
||||
int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
|
||||
CallQueue callQueue;
|
||||
if (maxCallQueueSize > 0) {
|
||||
callQueue = new CallQueue(new LinkedBlockingQueue<Call>(maxCallQueueSize), metrics);
|
||||
} else {
|
||||
callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
|
||||
}
|
||||
|
||||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||
tfb.setDaemon(true);
|
||||
tfb.setNameFormat("thrift2-worker-%d");
|
||||
|
@ -342,6 +349,7 @@ public class ThriftServer extends Configured implements Tool {
|
|||
Options options = getOptions();
|
||||
CommandLine cmd = parseArguments(conf, options, args);
|
||||
int workerThreads = 0;
|
||||
int maxCallQueueSize = -1; // use unbounded queue by default
|
||||
|
||||
/**
|
||||
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
|
||||
|
@ -475,6 +483,10 @@ public class ThriftServer extends Configured implements Tool {
|
|||
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
|
||||
}
|
||||
|
||||
if (cmd.hasOption("q")) {
|
||||
maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
|
||||
}
|
||||
|
||||
// check for user-defined info server port setting, if so override the conf
|
||||
try {
|
||||
if (cmd.hasOption("infoport")) {
|
||||
|
@ -508,6 +520,7 @@ public class ThriftServer extends Configured implements Tool {
|
|||
processor,
|
||||
transportFactory,
|
||||
workerThreads,
|
||||
maxCallQueueSize,
|
||||
inetSocketAddress,
|
||||
metrics);
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue