From 52bc6db0502cd1e37a55fa97fe5ab32232b787cc Mon Sep 17 00:00:00 2001 From: Allan Yang Date: Thu, 10 Jan 2019 11:03:46 +0800 Subject: [PATCH] HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server(addendum) --- .../hadoop/hbase/thrift/ThriftMetrics.java | 6 +-- .../hadoop/hbase/thrift/ThriftServer.java | 44 ++++++++++++++----- .../thrift2/ThriftHBaseServiceHandler.java | 2 +- .../hadoop/hbase/thrift2/ThriftServer.java | 6 +++ 4 files changed, 43 insertions(+), 15 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java index f612eebfbad..8c4c6f08d2a 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java @@ -56,8 +56,8 @@ public class ThriftMetrics { this.source = source; } - private MetricsThriftServerSource source; - private final long slowResponseTime; + protected MetricsThriftServerSource source; + protected final long slowResponseTime; public static final String SLOW_RESPONSE_NANO_SEC = "hbase.thrift.slow.response.nano.second"; public static final long DEFAULT_SLOW_RESPONSE_NANO_SEC = 10 * 1000 * 1000; @@ -149,7 +149,7 @@ public class ThriftMetrics { } } - private static Throwable unwrap(Throwable t) { + protected static Throwable unwrap(Throwable t) { if (t == null) { return t; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index 830ce527b14..598b3063689 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -196,6 +196,10 @@ public class ThriftServer extends Configured implements Tool { this.conf = HBaseConfiguration.create(conf); } + protected ThriftMetrics createThriftMetrics(Configuration conf) { + return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); + } + protected void setupParamters() throws IOException { // login the server principal (if using secure Hadoop) UserProvider userProvider = UserProvider.instantiate(conf); @@ -210,7 +214,7 @@ public class ThriftServer extends Configured implements Tool { this.serviceUGI = userProvider.getCurrent().getUGI(); this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); - this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); + this.metrics = createThriftMetrics(conf); this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource()); this.hbaseServiceHandler = createHandler(conf, userProvider); this.hbaseServiceHandler.initMetrics(metrics); @@ -278,11 +282,19 @@ public class ThriftServer extends Configured implements Tool { HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf)); } + /** + * the thrift server, not null means the server is started, for test only + * @return the tServer + */ @VisibleForTesting public TServer getTserver() { return tserver; } + /** + * the Jetty server, not null means the HTTP server is started, for test only + * @return the http server + */ @VisibleForTesting public Server getHttpServer() { return httpServer; @@ -300,6 +312,17 @@ public class ThriftServer extends Configured implements Tool { throw new ExitCodeException(exitCode, ""); } + /** + * Create a Servlet for the http server + * @param protocolFactory protocolFactory + * @return the servlet + * @throws IOException IOException + */ + protected TServlet createTServlet(TProtocolFactory protocolFactory) throws IOException { + return new ThriftHttpServlet(processor, protocolFactory, serviceUGI, + conf, hbaseServiceHandler, securityEnabled, doAsEnabled); + } + /** * Setup a HTTP Server using Jetty to serve calls from THttpClient * @@ -307,8 +330,7 @@ public class ThriftServer extends Configured implements Tool { */ protected void setupHTTPServer() throws IOException { TProtocolFactory protocolFactory = new TBinaryProtocol.Factory(); - TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI, - conf, hbaseServiceHandler, securityEnabled, doAsEnabled); + TServlet thriftHttpServlet = createTServlet(protocolFactory); // Set the default max thread number to 100 to limit // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily. @@ -509,7 +531,7 @@ public class ThriftServer extends Configured implements Tool { } } - private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, + protected TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport, TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) { LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); @@ -520,7 +542,7 @@ public class ThriftServer extends Configured implements Tool { return new TNonblockingServer(serverArgs); } - private TServer getTHsHaServer(TNonblockingServerTransport serverTransport, + protected TServer getTHsHaServer(TNonblockingServerTransport serverTransport, TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) { LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString()); @@ -537,7 +559,7 @@ public class ThriftServer extends Configured implements Tool { return new THsHaServer(serverArgs); } - private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, + protected TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport, TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) { LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString()); @@ -557,7 +579,7 @@ public class ThriftServer extends Configured implements Tool { return new TThreadedSelectorServer(serverArgs); } - private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, + protected TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception { LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); // Thrift's implementation uses '0' as a placeholder for 'use the default.' @@ -576,7 +598,7 @@ public class ThriftServer extends Configured implements Tool { return new TBoundedThreadPoolServer(serverArgs, metrics); } - private TProtocolFactory getProtocolFactory() { + protected TProtocolFactory getProtocolFactory() { TProtocolFactory protocolFactory; if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) { @@ -590,7 +612,7 @@ public class ThriftServer extends Configured implements Tool { return protocolFactory; } - ExecutorService createExecutor(BlockingQueue callQueue, + protected ExecutorService createExecutor(BlockingQueue callQueue, int minWorkers, int maxWorkers) { ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); tfb.setDaemon(true); @@ -601,7 +623,7 @@ public class ThriftServer extends Configured implements Tool { return threadPool; } - private InetAddress getBindAddress(Configuration conf) + protected InetAddress getBindAddress(Configuration conf) throws UnknownHostException { String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); return InetAddress.getByName(bindAddressStr); @@ -714,7 +736,7 @@ public class ThriftServer extends Configured implements Tool { /** * Parse the command line options to set parameters the conf. */ - private void processOptions(final String[] args) throws Exception { + protected void processOptions(final String[] args) throws Exception { if (args == null || args.length == 0) { return; } diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java index 519a16bf361..a112cef7cb3 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java @@ -145,7 +145,7 @@ public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements TH } } - ThriftHBaseServiceHandler(final Configuration conf, + public ThriftHBaseServiceHandler(final Configuration conf, final UserProvider userProvider) throws IOException { super(conf, userProvider); isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java index fa3d39db25b..e04a112bca8 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.thrift.HBaseServiceHandler; import org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy; +import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.ToolRunner; @@ -77,6 +78,11 @@ public class ThriftServer extends org.apache.hadoop.hbase.thrift.ThriftServer { return new ThriftHBaseServiceHandler(conf, userProvider); } + @Override + protected ThriftMetrics createThriftMetrics(Configuration conf) { + return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO); + } + @Override protected TProcessor createProcessor() { return new THBaseService.Processor<>(HbaseHandlerMetricsProxy