HBASE-17181 Let HBase thrift2 support TThreadedSelectorServer

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
eyjian 2016-11-30 09:16:01 +08:00 committed by zhangduo
parent 682dd57cd6
commit 55645c351e
1 changed files with 45 additions and 3 deletions

View File

@ -47,10 +47,10 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.security.SaslUtil;
@ -63,8 +63,8 @@ import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@ -76,6 +76,7 @@ import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
@ -142,6 +143,7 @@ public class ThriftServer {
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
options.addOption("s", "selectors", true, "How many selector threads to use.");
options.addOption("q", "callQueueSize", true,
"Max size of request queue (unbounded by default)");
options.addOption("h", "help", false, "Print help information");
@ -154,6 +156,7 @@ public class ThriftServer {
servers.addOption(
new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers);
return options;
@ -273,6 +276,30 @@ public class ThriftServer {
return new THsHaServer(serverArgs);
}
private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
TProcessor processor, TTransportFactory transportFactory,
int workerThreads, int selectorThreads, int maxCallQueueSize,
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
if (workerThreads > 0) {
serverArgs.workerThreads(workerThreads);
}
if (selectorThreads > 0) {
serverArgs.selectorThreads(selectorThreads);
}
ExecutorService executorService = createExecutor(
workerThreads, maxCallQueueSize, metrics);
serverArgs.executorService(executorService);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
return new TThreadedSelectorServer(serverArgs);
}
private static ExecutorService createExecutor(
int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
CallQueue callQueue;
@ -344,6 +371,7 @@ public class ThriftServer {
Configuration conf = HBaseConfiguration.create();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
int selectorThreads = 0;
int maxCallQueueSize = -1; // use unbounded queue by default
/**
@ -426,6 +454,7 @@ public class ThriftServer {
boolean nonblocking = cmd.hasOption("nonblocking");
boolean hsha = cmd.hasOption("hsha");
boolean selector = cmd.hasOption("selector");
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
@ -435,6 +464,8 @@ public class ThriftServer {
implType = "nonblocking";
} else if (hsha) {
implType = "hsha";
} else if (selector) {
implType = "selector";
}
conf.set("hbase.regionserver.thrift.server.type", implType);
@ -478,7 +509,9 @@ public class ThriftServer {
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
if (cmd.hasOption("s")) {
selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
}
if (cmd.hasOption("q")) {
maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
}
@ -519,6 +552,15 @@ public class ThriftServer {
maxCallQueueSize,
inetSocketAddress,
metrics);
} else if (selector) {
server = getTThreadedSelectorServer(protocolFactory,
processor,
transportFactory,
workerThreads,
selectorThreads,
maxCallQueueSize,
inetSocketAddress,
metrics);
} else {
server = getTThreadPoolServer(protocolFactory,
processor,