HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server(addendum)

This commit is contained in:
Allan Yang 2019-01-10 11:09:26 +08:00
parent 58b11dcb1c
commit f59b99c48a
4 changed files with 43 additions and 15 deletions

View File

@ -56,8 +56,8 @@ public class ThriftMetrics {
this.source = source;
}
private MetricsThriftServerSource source;
private final long slowResponseTime;
protected MetricsThriftServerSource source;
protected final long slowResponseTime;
public static final String SLOW_RESPONSE_NANO_SEC =
"hbase.thrift.slow.response.nano.second";
public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000;
@ -147,7 +147,7 @@ public class ThriftMetrics {
}
}
private static Throwable unwrap(Throwable t) {
protected static Throwable unwrap(Throwable t) {
if (t == null) {
return t;
}

View File

@ -196,6 +196,10 @@ public class ThriftServer extends Configured implements Tool {
this.conf = HBaseConfiguration.create(conf);
}
protected ThriftMetrics createThriftMetrics(Configuration conf) {
return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
}
protected void setupParamters() throws IOException {
// login the server principal (if using secure Hadoop)
UserProvider userProvider = UserProvider.instantiate(conf);
@ -210,7 +214,7 @@ public class ThriftServer extends Configured implements Tool {
this.serviceUGI = userProvider.getCurrent().getUGI();
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
this.metrics = createThriftMetrics(conf);
this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
this.hbaseServiceHandler = createHandler(conf, userProvider);
this.hbaseServiceHandler.initMetrics(metrics);
@ -278,11 +282,19 @@ public class ThriftServer extends Configured implements Tool {
HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
}
/**
* the thrift server, not null means the server is started, for test only
* @return the tServer
*/
@VisibleForTesting
public TServer getTserver() {
return tserver;
}
/**
* the Jetty server, not null means the HTTP server is started, for test only
* @return the http server
*/
@VisibleForTesting
public Server getHttpServer() {
return httpServer;
@ -300,6 +312,17 @@ public class ThriftServer extends Configured implements Tool {
throw new ExitCodeException(exitCode, "");
}
/**
* Create a Servlet for the http server
* @param protocolFactory protocolFactory
* @return the servlet
* @throws IOException IOException
*/
protected TServlet createTServlet(TProtocolFactory protocolFactory) throws IOException {
return new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
}
/**
* Setup a HTTP Server using Jetty to serve calls from THttpClient
*
@ -307,8 +330,7 @@ public class ThriftServer extends Configured implements Tool {
*/
protected void setupHTTPServer() throws IOException {
TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
TServlet thriftHttpServlet = createTServlet(protocolFactory);
// Set the default max thread number to 100 to limit
// the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
@ -509,7 +531,7 @@ public class ThriftServer extends Configured implements Tool {
}
}
private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
@ -520,7 +542,7 @@ public class ThriftServer extends Configured implements Tool {
return new TNonblockingServer(serverArgs);
}
private TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
@ -537,7 +559,7 @@ public class ThriftServer extends Configured implements Tool {
return new THsHaServer(serverArgs);
}
private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
@ -557,7 +579,7 @@ public class ThriftServer extends Configured implements Tool {
return new TThreadedSelectorServer(serverArgs);
}
private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
@ -576,7 +598,7 @@ public class ThriftServer extends Configured implements Tool {
return new TBoundedThreadPoolServer(serverArgs, metrics);
}
private TProtocolFactory getProtocolFactory() {
protected TProtocolFactory getProtocolFactory() {
TProtocolFactory protocolFactory;
if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
@ -590,7 +612,7 @@ public class ThriftServer extends Configured implements Tool {
return protocolFactory;
}
ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
protected ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
int minWorkers, int maxWorkers) {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
@ -601,7 +623,7 @@ public class ThriftServer extends Configured implements Tool {
return threadPool;
}
private InetAddress getBindAddress(Configuration conf)
protected InetAddress getBindAddress(Configuration conf)
throws UnknownHostException {
String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
return InetAddress.getByName(bindAddressStr);
@ -714,7 +736,7 @@ public class ThriftServer extends Configured implements Tool {
/**
* Parse the command line options to set parameters the conf.
*/
private void processOptions(final String[] args) throws Exception {
protected void processOptions(final String[] args) throws Exception {
if (args == null || args.length == 0) {
return;
}

View File

@ -145,7 +145,7 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH
}
}
ThriftHBaseServiceHandler(final Configuration conf,
public ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException {
super(conf, userProvider);
isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.HBaseServiceHandler;
import org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.ToolRunner;
@ -77,6 +78,11 @@ public class ThriftServer extends org.apache.hadoop.hbase.thrift.ThriftServer {
return new ThriftHBaseServiceHandler(conf, userProvider);
}
@Override
protected ThriftMetrics createThriftMetrics(Configuration conf) {
return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
}
@Override
protected TProcessor createProcessor() {
return new THBaseService.Processor<>(HbaseHandlerMetricsProxy