HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server

This commit is contained in:
Allan Yang 2019-01-02 16:13:17 +08:00
parent f0b50a8f9b
commit e4b6b4afb9
19 changed files with 2713 additions and 2813 deletions

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrift related constants
*/
@InterfaceAudience.Private
public final class Constants {
private Constants(){}
public static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
public static final String SERVER_TYPE_CONF_KEY =
"hbase.regionserver.thrift.server.type";
public static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
public static final boolean COMPACT_CONF_DEFAULT = false;
public static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
public static final boolean FRAMED_CONF_DEFAULT = false;
public static final String MAX_FRAME_SIZE_CONF_KEY =
"hbase.regionserver.thrift.framed.max_frame_size_in_mb";
public static final int MAX_FRAME_SIZE_CONF_DEFAULT = 2;
public static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
public static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
public static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
public static final int HTTP_MIN_THREADS_KEY_DEFAULT = 2;
public static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
public static final int HTTP_MAX_THREADS_KEY_DEFAULT = 100;
// ssl related configs
public static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
public static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
public static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY =
"hbase.thrift.ssl.keystore.password";
public static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY
= "hbase.thrift.ssl.keystore.keypassword";
public static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
"hbase.thrift.ssl.exclude.cipher.suites";
public static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
"hbase.thrift.ssl.include.cipher.suites";
public static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY =
"hbase.thrift.ssl.exclude.protocols";
public static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY =
"hbase.thrift.ssl.include.protocols";
public static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
//kerberos related configs
public static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
public static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
public static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
public static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
public static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
public static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
/**
* Amount of time in milliseconds before a server thread will timeout
* waiting for client to send data on a connected socket. Currently,
* applies only to TBoundedThreadPoolServer
*/
public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
"hbase.thrift.server.socket.read.timeout";
public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
/**
* Thrift quality of protection configuration key. Valid values can be:
* auth-conf: authentication, integrity and confidentiality checking
* auth-int: authentication and integrity checking
* auth: authentication only
*
* This is used to authenticate the callers and support impersonation.
* The thrift server and the HBase cluster must run in secure mode.
*/
public static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
public static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
public static final int BACKLOG_CONF_DEAFULT = 0;
public static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
public static final String DEFAULT_BIND_ADDR = "0.0.0.0";
public static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
public static final int DEFAULT_LISTEN_PORT = 9090;
public static final String THRIFT_HTTP_ALLOW_OPTIONS_METHOD =
"hbase.thrift.http.allow.options.method";
public static final boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
public static final String THRIFT_INFO_SERVER_PORT = "hbase.thrift.info.port";
public static final int THRIFT_INFO_SERVER_PORT_DEFAULT = 9095;
public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS = "hbase.thrift.info.bindAddress";
public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT = "0.0.0.0";
public static final String THRIFT_QUEUE_SIZE = "hbase.thrift.queue.size";
public static final int THRIFT_QUEUE_SIZE_DEFAULT = Integer.MAX_VALUE;
public static final String THRIFT_SELECTOR_NUM = "hbase.thrift.selector.num";
public static final String THRIFT_FILTERS = "hbase.thrift.filters";
// Command line options
public static final String READ_TIMEOUT_OPTION = "readTimeout";
public static final String MIN_WORKERS_OPTION = "minWorkers";
public static final String MAX_WORKERS_OPTION = "workers";
public static final String MAX_QUEUE_SIZE_OPTION = "queue";
public static final String SELECTOR_NUM_OPTION = "selectors";
public static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
public static final String BIND_OPTION = "bind";
public static final String COMPACT_OPTION = "compact";
public static final String FRAMED_OPTION = "framed";
public static final String PORT_OPTION = "port";
public static final String INFOPORT_OPTION = "infoport";
//for thrift2 server
public static final String READONLY_OPTION ="readonly";
public static final String THRIFT_READONLY_ENABLED = "hbase.thrift.readonly";
public static final boolean THRIFT_READONLY_ENABLED_DEFAULT = false;
}

View File

@ -0,0 +1,90 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.yetus.audience.InterfaceAudience;
/**
* abstract class for HBase handler
* providing a Connection cache and get table/admin method
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public abstract class HBaseServiceHandler {
public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
protected Configuration conf;
protected final ConnectionCache connectionCache;
public HBaseServiceHandler(final Configuration c,
final UserProvider userProvider) throws IOException {
this.conf = c;
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
}
protected ThriftMetrics metrics = null;
public void initMetrics(ThriftMetrics metrics) {
this.metrics = metrics;
}
public void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}
/**
* Obtain HBaseAdmin. Creates the instance if it is not already created.
*/
protected Admin getAdmin() throws IOException {
return connectionCache.getAdmin();
}
/**
* Creates and returns a Table instance from a given table name.
*
* @param tableName
* name of table
* @return Table object
* @throws IOException if getting the table fails
*/
protected Table getTable(final byte[] tableName) throws IOException {
String table = Bytes.toString(tableName);
return connectionCache.getTable(table);
}
protected Table getTable(final ByteBuffer tableName) throws IOException {
return getTable(Bytes.getBytes(tableName));
}
}

View File

@ -25,9 +25,8 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Converts a Hbase.Iface using InvocationHandler so that it reports process * Converts a Hbase.Iface using InvocationHandler so that it reports process
@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class HbaseHandlerMetricsProxy implements InvocationHandler { public final class HbaseHandlerMetricsProxy implements InvocationHandler {
private static final Logger LOG = LoggerFactory.getLogger( private final Object handler;
HbaseHandlerMetricsProxy.class);
private final Hbase.Iface handler;
private final ThriftMetrics metrics; private final ThriftMetrics metrics;
public static Hbase.Iface newInstance(Hbase.Iface handler, public static Hbase.Iface newInstance(Hbase.Iface handler,
@ -51,8 +47,18 @@ public final class HbaseHandlerMetricsProxy implements InvocationHandler {
new HbaseHandlerMetricsProxy(handler, metrics, conf)); new HbaseHandlerMetricsProxy(handler, metrics, conf));
} }
// for thrift 2
public static THBaseService.Iface newInstance(THBaseService.Iface handler,
ThriftMetrics metrics,
Configuration conf) {
return (THBaseService.Iface) Proxy.newProxyInstance(
handler.getClass().getClassLoader(),
new Class[]{THBaseService.Iface.class},
new HbaseHandlerMetricsProxy(handler, metrics, conf));
}
private HbaseHandlerMetricsProxy( private HbaseHandlerMetricsProxy(
Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) { Object handler, ThriftMetrics metrics, Configuration conf) {
this.handler = handler; this.handler = handler;
this.metrics = metrics; this.metrics = metrics;
} }

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.Constants.SERVER_TYPE_CONF_KEY;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
/** An enum of server implementation selections */
@InterfaceAudience.Private
public enum ImplType {
HS_HA("hsha", true, THsHaServer.class, true),
NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
private static final Logger LOG = LoggerFactory.getLogger(ImplType.class);
public static final ImplType DEFAULT = THREAD_POOL;
final String option;
final boolean isAlwaysFramed;
final Class<? extends TServer> serverClass;
final boolean canSpecifyBindIP;
private ImplType(String option, boolean isAlwaysFramed,
Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
this.option = option;
this.isAlwaysFramed = isAlwaysFramed;
this.serverClass = serverClass;
this.canSpecifyBindIP = canSpecifyBindIP;
}
/**
* @return <code>-option</code>
*/
@Override
public String toString() {
return "-" + option;
}
public String getOption() {
return option;
}
public boolean isAlwaysFramed() {
return isAlwaysFramed;
}
public String getDescription() {
StringBuilder sb = new StringBuilder("Use the " +
serverClass.getSimpleName());
if (isAlwaysFramed) {
sb.append(" This implies the framed transport.");
}
if (this == DEFAULT) {
sb.append("This is the default.");
}
return sb.toString();
}
static OptionGroup createOptionGroup() {
OptionGroup group = new OptionGroup();
for (ImplType t : values()) {
group.addOption(new Option(t.option, t.getDescription()));
}
return group;
}
public static ImplType getServerImpl(Configuration conf) {
String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
for (ImplType t : values()) {
if (confType.equals(t.option)) {
return t;
}
}
throw new AssertionError("Unknown server ImplType.option:" + confType);
}
static void setServerImpl(CommandLine cmd, Configuration conf) {
ImplType chosenType = null;
int numChosen = 0;
for (ImplType t : values()) {
if (cmd.hasOption(t.option)) {
chosenType = t;
++numChosen;
}
}
if (numChosen < 1) {
LOG.info("Using default thrift server type");
chosenType = DEFAULT;
} else if (numChosen > 1) {
throw new AssertionError("Exactly one option out of " +
Arrays.toString(values()) + " has to be specified");
}
LOG.info("Using thrift server type " + chosenType.option);
conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
}
public String simpleClassName() {
return serverClass.getSimpleName();
}
public static List<String> serversThatCannotSpecifyBindIP() {
List<String> l = new ArrayList<>();
for (ImplType t : values()) {
if (!t.canSpecifyBindIP) {
l.add(t.simpleClassName());
}
}
return l;
}
}

View File

@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.TIncrement; import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -180,7 +179,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
new ConcurrentHashMap<>(100000, 0.75f, 1500); new ConcurrentHashMap<>(100000, 0.75f, 1500);
private final ThreadPoolExecutor pool; private final ThreadPoolExecutor pool;
private final HBaseHandler handler; private final ThriftHBaseServiceHandler handler;
private int maxQueueSize = 500000; private int maxQueueSize = 500000;
private static final int CORE_POOL_SIZE = 1; private static final int CORE_POOL_SIZE = 1;
@ -188,7 +187,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class); private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class);
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public IncrementCoalescer(HBaseHandler hand) { public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
this.handler = hand; this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
pool = pool =
@ -230,6 +229,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
inc.getAmmount()); inc.getAmmount());
} }
@SuppressWarnings("FutureReturnValueIgnored")
private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
byte[] qual, long ammount) throws TException { byte[] qual, long ammount) throws TException {
int countersMapSize = countersMap.size(); int countersMapSize = countersMap.size();

View File

@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
@ -58,7 +58,7 @@ public class ThriftHttpServlet extends TServlet {
private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName()); private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
private final transient UserGroupInformation serviceUGI; private final transient UserGroupInformation serviceUGI;
private final transient UserGroupInformation httpUGI; private final transient UserGroupInformation httpUGI;
private final transient ThriftServerRunner.HBaseHandler hbaseHandler; private final transient HBaseServiceHandler handler;
private final boolean doAsEnabled; private final boolean doAsEnabled;
private final boolean securityEnabled; private final boolean securityEnabled;
@ -67,11 +67,11 @@ public class ThriftHttpServlet extends TServlet {
public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory, public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
UserGroupInformation serviceUGI, Configuration conf, UserGroupInformation serviceUGI, Configuration conf,
ThriftServerRunner.HBaseHandler hbaseHandler, boolean securityEnabled, boolean doAsEnabled) HBaseServiceHandler handler, boolean securityEnabled, boolean doAsEnabled)
throws IOException { throws IOException {
super(processor, protocolFactory); super(processor, protocolFactory);
this.serviceUGI = serviceUGI; this.serviceUGI = serviceUGI;
this.hbaseHandler = hbaseHandler; this.handler = handler;
this.securityEnabled = securityEnabled; this.securityEnabled = securityEnabled;
this.doAsEnabled = doAsEnabled; this.doAsEnabled = doAsEnabled;
@ -146,7 +146,7 @@ public class ThriftHttpServlet extends TServlet {
} }
effectiveUser = doAsUserFromQuery; effectiveUser = doAsUserFromQuery;
} }
hbaseHandler.setEffectiveUser(effectiveUser); handler.setEffectiveUser(effectiveUser);
super.doPost(request, response); super.doPost(request, response);
} }

View File

@ -18,16 +18,132 @@
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslServer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.http.HttpServerUtil;
import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SecurityUtil;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServlet;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser; import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
@ -40,29 +156,36 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
* independent process. * independent process.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
public class ThriftServer { public class ThriftServer extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class); private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
private static final String MIN_WORKERS_OPTION = "minWorkers";
private static final String MAX_WORKERS_OPTION = "workers";
private static final String MAX_QUEUE_SIZE_OPTION = "queue";
private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
static final String BIND_OPTION = "bind";
static final String COMPACT_OPTION = "compact";
static final String FRAMED_OPTION = "framed";
static final String PORT_OPTION = "port";
static final String INFOPORT_OPTION = "infoport";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
private static final int DEFAULT_LISTEN_PORT = 9090;
private Configuration conf; protected Configuration conf;
ThriftServerRunner serverRunner;
private InfoServer infoServer; protected InfoServer infoServer;
protected TProcessor processor;
protected ThriftMetrics metrics;
protected HBaseServiceHandler hbaseServiceHandler;
protected UserGroupInformation serviceUGI;
protected boolean httpEnabled;
protected SaslUtil.QualityOfProtection qop;
protected String host;
protected int listenPort;
protected boolean securityEnabled;
protected boolean doAsEnabled;
protected JvmPauseMonitor pauseMonitor;
protected volatile TServer tserver;
protected volatile Server httpServer;
private static final String READ_TIMEOUT_OPTION = "readTimeout";
// //
// Main program and support routines // Main program and support routines
@ -72,7 +195,89 @@ public class ThriftServer {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
} }
private static void printUsageAndExit(Options options, int exitCode) protected void setupParamters() throws IOException {
// login the server principal (if using secure Hadoop)
UserProvider userProvider = UserProvider.instantiate(conf);
securityEnabled = userProvider.isHadoopSecurityEnabled()
&& userProvider.isHBaseSecurityEnabled();
if (securityEnabled) {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
}
this.serviceUGI = userProvider.getCurrent().getUGI();
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
this.hbaseServiceHandler = createHandler(conf, userProvider);
this.hbaseServiceHandler.initMetrics(metrics);
this.processor = createProcessor();
httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
if (doAsEnabled && !httpEnabled) {
LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
}
String strQop = conf.get(THRIFT_QOP_KEY);
if (strQop != null) {
this.qop = SaslUtil.getQop(strQop);
}
if (qop != null) {
if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
qop != SaslUtil.QualityOfProtection.INTEGRITY &&
qop != SaslUtil.QualityOfProtection.PRIVACY) {
throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
THRIFT_QOP_KEY,
SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
SaslUtil.QualityOfProtection.INTEGRITY.name(),
SaslUtil.QualityOfProtection.PRIVACY.name()));
}
checkHttpSecurity(qop, conf);
if (!securityEnabled) {
throw new IOException("Thrift server must run in secure mode to support authentication");
}
}
registerFilters(conf);
pauseMonitor.start();
}
protected void startInfoServer() throws IOException {
// Put up info server.
int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
if (port >= 0) {
conf.setLong("startcode", System.currentTimeMillis());
String a = conf
.get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
infoServer = new InfoServer("thrift", a, port, false, conf);
infoServer.setAttribute("hbase.conf", conf);
infoServer.start();
}
}
protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
!conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
THRIFT_SSL_ENABLED_KEY + " is false");
}
}
protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
throws IOException {
return new ThriftHBaseServiceHandler(conf, userProvider);
}
protected TProcessor createProcessor() {
return new Hbase.Processor<>(
HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
}
protected void printUsageAndExit(Options options, int exitCode)
throws ExitCodeException { throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, options, formatter.printHelp("Thrift", null, options,
@ -85,32 +290,333 @@ public class ThriftServer {
} }
/** /**
* Start up or shuts down the Thrift server, depending on the arguments. * Setup a HTTP Server using Jetty to serve calls from THttpClient
* @param args the arguments to pass in when starting the Thrift server *
* @throws IOException IOException
*/ */
void doMain(final String[] args) throws Exception { protected void setupHTTPServer() throws IOException {
processOptions(args); TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
serverRunner = new ThriftServerRunner(conf); TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
// Put up info server. // Set the default max thread number to 100 to limit
int port = conf.getInt("hbase.thrift.info.port", 9095); // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
// Jetty set the default max thread number to 250, if we don't set it.
//
// Our default min thread number 2 is the same as that used by Jetty.
int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
HTTP_MIN_THREADS_KEY_DEFAULT));
int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
HTTP_MAX_THREADS_KEY_DEFAULT));
QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
threadPool.setMinThreads(minThreads);
httpServer = new Server(threadPool);
if (port >= 0) { // Context handler
conf.setLong("startcode", System.currentTimeMillis()); ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0"); ServletContextHandler.SESSIONS);
infoServer = new InfoServer("thrift", a, port, false, conf); ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
infoServer.setAttribute("hbase.conf", conf); HttpServerUtil.constrainHttpMethods(ctxHandler,
infoServer.start(); conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
// set up Jetty and run the embedded server
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setSecureScheme("https");
httpConfig.setSecurePort(listenPort);
httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
httpConfig.setSendServerVersion(false);
httpConfig.setSendDateHeader(false);
ServerConnector serverConnector;
if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
httpsConfig.addCustomizer(new SecureRequestCustomizer());
SslContextFactory sslCtxFactory = new SslContextFactory();
String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
String password = HBaseConfiguration.getPassword(conf,
THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
String keyPassword = HBaseConfiguration.getPassword(conf,
THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
sslCtxFactory.setKeyStorePath(keystore);
sslCtxFactory.setKeyStorePassword(password);
sslCtxFactory.setKeyManagerPassword(keyPassword);
String[] excludeCiphers = conf.getStrings(
THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
if (excludeCiphers.length != 0) {
sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
}
String[] includeCiphers = conf.getStrings(
THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
if (includeCiphers.length != 0) {
sslCtxFactory.setIncludeCipherSuites(includeCiphers);
} }
serverRunner.run(); // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
String[] excludeProtocols = conf.getStrings(
THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
if (excludeProtocols.length != 0) {
sslCtxFactory.setExcludeProtocols(excludeProtocols);
}
String[] includeProtocols = conf.getStrings(
THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
if (includeProtocols.length != 0) {
sslCtxFactory.setIncludeProtocols(includeProtocols);
}
serverConnector = new ServerConnector(httpServer,
new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
new HttpConnectionFactory(httpsConfig));
} else {
serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
}
serverConnector.setPort(listenPort);
serverConnector.setHost(getBindAddress(conf).getHostAddress());
httpServer.addConnector(serverConnector);
httpServer.setStopAtShutdown(true);
if (doAsEnabled) {
ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
}
LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
} }
/** /**
* Parse the command line options to set parameters the conf. * Setting up the thrift TServer
*/ */
private void processOptions(final String[] args) throws Exception { protected void setupServer() throws Exception {
Options options = new Options(); // Construct correct ProtocolFactory
TProtocolFactory protocolFactory = getProtocolFactory();
ImplType implType = ImplType.getServerImpl(conf);
TProcessor processorToUse = processor;
// Construct correct TransportFactory
TTransportFactory transportFactory;
if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
if (qop != null) {
throw new RuntimeException("Thrift server authentication"
+ " doesn't work with framed transport yet");
}
transportFactory = new TFramedTransport.Factory(
conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
LOG.debug("Using framed transport");
} else if (qop == null) {
transportFactory = new TTransportFactory();
} else {
// Extract the name from the principal
String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
if (thriftKerberosPrincipal == null) {
throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
}
String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
new SaslRpcServer.SaslGssCallbackHandler() {
@Override
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL GSSAPI Callback");
}
}
if (ac != null) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (!authid.equals(authzid)) {
ac.setAuthorized(false);
} else {
ac.setAuthorized(true);
String userName = SecurityUtil.getUserFromPrincipal(authzid);
LOG.info("Effective user: {}", userName);
ac.setAuthorizedID(userName);
}
}
}
});
transportFactory = saslFactory;
// Create a processor wrapper, to get the caller
processorToUse = (inProt, outProt) -> {
TSaslServerTransport saslServerTransport =
(TSaslServerTransport)inProt.getTransport();
SaslServer saslServer = saslServerTransport.getSaslServer();
String principal = saslServer.getAuthorizationID();
hbaseServiceHandler.setEffectiveUser(principal);
return processor.process(inProt, outProt);
};
}
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
LOG.error("Server types {} don't support IP address binding at the moment. See " +
"https://issues.apache.org/jira/browse/HBASE-2155 for details.",
Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
}
InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
implType == ImplType.THREADED_SELECTOR) {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
if (implType == ImplType.NONBLOCKING) {
tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
transportFactory, inetSocketAddress);
} else if (implType == ImplType.HS_HA) {
tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
inetSocketAddress);
} else { // THREADED_SELECTOR
tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
transportFactory, inetSocketAddress);
}
LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
Integer.toString(listenPort));
} else if (implType == ImplType.THREAD_POOL) {
this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
inetSocketAddress);
} else {
throw new AssertionError("Unsupported Thrift server implementation: " +
implType.simpleClassName());
}
// A sanity check that we instantiated the right type of server.
if (tserver.getClass() != implType.serverClass) {
throw new AssertionError("Expected to create Thrift server class " +
implType.serverClass.getName() + " but got " +
tserver.getClass().getName());
}
}
private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
return new TNonblockingServer(serverArgs);
}
private TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
serverArgs.getMaxWorkerThreads());
ExecutorService executorService = createExecutor(
callQueue, workerThread, workerThread);
serverArgs.executorService(executorService).processor(processor)
.transportFactory(transportFactory).protocolFactory(protocolFactory);
return new THsHaServer(serverArgs);
}
private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress) {
LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
TThreadedSelectorServer.Args serverArgs =
new HThreadedSelectorServerArgs(serverTransport, conf);
int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
serverArgs.getWorkerThreads());
int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
serverArgs.selectorThreads(selectorThreads);
ExecutorService executorService = createExecutor(
callQueue, workerThreads, workerThreads);
serverArgs.executorService(executorService).processor(processor)
.transportFactory(transportFactory).protocolFactory(protocolFactory);
return new TThreadedSelectorServer(serverArgs);
}
private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs().
bindAddr(inetSocketAddress).backlog(backlog).
clientTimeout(readTimeout));
TBoundedThreadPoolServer.Args serverArgs =
new TBoundedThreadPoolServer.Args(serverTransport, conf);
serverArgs.processor(processor).transportFactory(transportFactory)
.protocolFactory(protocolFactory);
return new TBoundedThreadPoolServer(serverArgs, metrics);
}
private TProtocolFactory getProtocolFactory() {
TProtocolFactory protocolFactory;
if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
LOG.debug("Using compact protocol");
protocolFactory = new TCompactProtocol.Factory();
} else {
LOG.debug("Using binary protocol");
protocolFactory = new TBinaryProtocol.Factory();
}
return protocolFactory;
}
ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
int minWorkers, int maxWorkers) {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d");
ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
threadPool.allowCoreThreadTimeOut(true);
return threadPool;
}
private InetAddress getBindAddress(Configuration conf)
throws UnknownHostException {
String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
return InetAddress.getByName(bindAddressStr);
}
public static void registerFilters(Configuration conf) {
String[] filters = conf.getStrings(THRIFT_FILTERS);
Splitter splitter = Splitter.on(':');
if(filters != null) {
for(String filterClass: filters) {
List<String> filterPart = splitter.splitToList(filterClass);
if(filterPart.size() != 2) {
LOG.warn("Invalid filter specification " + filterClass + " - skipping");
} else {
ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
}
}
}
}
/**
* Add options to command lines
* @param options options
*/
protected void addOptions(Options options) {
options.addOption("b", BIND_OPTION, true, "Address to bind " + options.addOption("b", BIND_OPTION, true, "Address to bind " +
"the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]"); "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
@ -118,6 +624,7 @@ public class ThriftServer {
options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information"); options.addOption("h", "help", false, "Print help information");
options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
options.addOption(null, INFOPORT_OPTION, true, "Port for web UI"); options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
options.addOption("m", MIN_WORKERS_OPTION, true, options.addOption("m", MIN_WORKERS_OPTION, true,
@ -142,30 +649,24 @@ public class ThriftServer {
"only applies to TBoundedThreadPoolServer"); "only applies to TBoundedThreadPoolServer");
options.addOptionGroup(ImplType.createOptionGroup()); options.addOptionGroup(ImplType.createOptionGroup());
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("help")) {
printUsageAndExit(options, 1);
} }
protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
// Get port to bind to // Get port to bind to
try { try {
if (cmd.hasOption(PORT_OPTION)) { if (cmd.hasOption(PORT_OPTION)) {
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); conf.setInt(PORT_CONF_KEY, listenPort);
} }
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e); LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1); printUsageAndExit(options, -1);
} }
// check for user-defined info server port setting, if so override the conf // check for user-defined info server port setting, if so override the conf
try { try {
if (cmd.hasOption(INFOPORT_OPTION)) { if (cmd.hasOption(INFOPORT_OPTION)) {
String val = cmd.getOptionValue(INFOPORT_OPTION); String val = cmd.getOptionValue(INFOPORT_OPTION);
conf.setInt("hbase.thrift.info.port", Integer.parseInt(val)); conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
LOG.debug("Web UI port set to " + val); LOG.debug("Web UI port set to " + val);
} }
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
@ -173,7 +674,6 @@ public class ThriftServer {
" option", e); " option", e);
printUsageAndExit(options, -1); printUsageAndExit(options, -1);
} }
// Make optional changes to the configuration based on command-line options // Make optional changes to the configuration based on command-line options
optionToConf(cmd, MIN_WORKERS_OPTION, optionToConf(cmd, MIN_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@ -183,23 +683,42 @@ public class ThriftServer {
conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
optionToConf(cmd, READ_TIMEOUT_OPTION, conf, optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY); optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
// Set general thrift server options // Set general thrift server options
boolean compact = cmd.hasOption(COMPACT_OPTION) || boolean compact = cmd.hasOption(COMPACT_OPTION) ||
conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false); conf.getBoolean(COMPACT_CONF_KEY, false);
conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact); conf.setBoolean(COMPACT_CONF_KEY, compact);
boolean framed = cmd.hasOption(FRAMED_OPTION) || boolean framed = cmd.hasOption(FRAMED_OPTION) ||
conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false); conf.getBoolean(FRAMED_CONF_KEY, false);
conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed); conf.setBoolean(FRAMED_CONF_KEY, framed);
if (cmd.hasOption(BIND_OPTION)) {
conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION)); optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
}
ImplType.setServerImpl(cmd, conf); ImplType.setServerImpl(cmd, conf);
} }
/**
* Parse the command line options to set parameters the conf.
*/
private void processOptions(final String[] args) throws Exception {
if (args == null || args.length == 0) {
return;
}
Options options = new Options();
addOptions(options);
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("help")) {
printUsageAndExit(options, 1);
}
parseCommandLine(cmd, options);
}
public void stop() { public void stop() {
if (this.infoServer != null) { if (this.infoServer != null) {
LOG.info("Stopping infoServer"); LOG.info("Stopping infoServer");
@ -209,10 +728,25 @@ public class ThriftServer {
LOG.error("Failed to stop infoServer", ex); LOG.error("Failed to stop infoServer", ex);
} }
} }
serverRunner.shutdown(); if (pauseMonitor != null) {
pauseMonitor.stop();
}
if (tserver != null) {
tserver.stop();
tserver = null;
}
if (httpServer != null) {
try {
httpServer.stop();
httpServer = null;
} catch (Exception e) {
LOG.error("Problem encountered in shutting down HTTP server", e);
}
httpServer = null;
}
} }
private static void optionToConf(CommandLine cmd, String option, protected static void optionToConf(CommandLine cmd, String option,
Configuration conf, String destConfKey) { Configuration conf, String destConfKey) {
if (cmd.hasOption(option)) { if (cmd.hasOption(option)) {
String value = cmd.getOptionValue(option); String value = cmd.getOptionValue(option);
@ -221,16 +755,38 @@ public class ThriftServer {
} }
} }
/**
* Run without any command line arguments
* @return exit code
* @throws Exception exception
*/
public int run() throws Exception {
return run(null);
}
@Override
public int run(String[] strings) throws Exception {
processOptions(strings);
setupParamters();
startInfoServer();
if (httpEnabled) {
setupHTTPServer();
httpServer.start();
httpServer.join();
} else {
setupServer();
tserver.serve();
}
return 0;
}
public static void main(String [] args) throws Exception { public static void main(String [] args) throws Exception {
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****"); LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
VersionInfo.logVersion(); VersionInfo.logVersion();
int exitCode = 0; final Configuration conf = HBaseConfiguration.create();
try { // for now, only time we return is on an argument error.
new ThriftServer(HBaseConfiguration.create()).doMain(args); final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
} catch (ExitCodeException ex) {
exitCode = ex.getExitCode();
}
LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****"); LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
System.exit(exitCode); System.exit(status);
} }
} }

View File

@ -18,6 +18,8 @@
*/ */
package org.apache.hadoop.hbase.thrift2; package org.apache.hadoop.hbase.thrift2;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.columnFamilyDescriptorFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.columnFamilyDescriptorFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift;
@ -44,10 +46,6 @@ import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.tableNamesFromHBas
import static org.apache.thrift.TBaseHelper.byteBufferToByteArray; import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -68,7 +66,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift.HBaseServiceHandler;
import org.apache.hadoop.hbase.thrift2.generated.TAppend; import org.apache.hadoop.hbase.thrift2.generated.TAppend;
import org.apache.hadoop.hbase.thrift2.generated.TColumnFamilyDescriptor; import org.apache.hadoop.hbase.thrift2.generated.TColumnFamilyDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TCompareOp; import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
@ -87,7 +85,6 @@ import org.apache.hadoop.hbase.thrift2.generated.TScan;
import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor; import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TTableName; import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -99,7 +96,7 @@ import org.slf4j.LoggerFactory;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class ThriftHBaseServiceHandler implements THBaseService.Iface { public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements THBaseService.Iface {
// TODO: Size of pool configuraple // TODO: Size of pool configuraple
private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class); private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
@ -109,50 +106,10 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
private final AtomicInteger nextScannerId = new AtomicInteger(0); private final AtomicInteger nextScannerId = new AtomicInteger(0);
private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>(); private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
private final ConnectionCache connectionCache;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
private static final IOException ioe private static final IOException ioe
= new DoNotRetryIOException("Thrift Server is in Read-only mode."); = new DoNotRetryIOException("Thrift Server is in Read-only mode.");
private boolean isReadOnly; private boolean isReadOnly;
public static THBaseService.Iface newInstance(
THBaseService.Iface handler, ThriftMetrics metrics) {
return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
}
private static final class THBaseServiceMetricsProxy implements InvocationHandler {
private final THBaseService.Iface handler;
private final ThriftMetrics metrics;
private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
this.handler = handler;
this.metrics = metrics;
}
@Override
public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
Object result;
long start = now();
try {
result = m.invoke(handler, args);
} catch (InvocationTargetException e) {
metrics.exception(e.getCause());
throw e.getTargetException();
} catch (Exception e) {
metrics.exception(e);
throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
} finally {
long processTime = now() - start;
metrics.incMethodTime(m.getName(), processTime);
}
return result;
}
}
private static class TIOErrorWithCause extends TIOError { private static class TIOErrorWithCause extends TIOError {
private Throwable cause; private Throwable cause;
@ -188,20 +145,14 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
} }
} }
private static long now() {
return System.nanoTime();
}
ThriftHBaseServiceHandler(final Configuration conf, ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException { final UserProvider userProvider) throws IOException {
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); super(conf, userProvider);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
isReadOnly = conf.getBoolean("hbase.thrift.readonly", false);
} }
private Table getTable(ByteBuffer tableName) { @Override
protected Table getTable(ByteBuffer tableName) {
try { try {
return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName))); return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
} catch (IOException ie) { } catch (IOException ie) {
@ -251,10 +202,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
return scannerMap.get(id); return scannerMap.get(id);
} }
void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}
/** /**
* Removes the scanner associated with the specified ID from the internal HashMap. * Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove * @param id of the Scanner to remove

View File

@ -18,355 +18,86 @@
*/ */
package org.apache.hadoop.hbase.thrift2; package org.apache.hadoop.hbase.thrift2;
import java.io.IOException; import static org.apache.hadoop.hbase.thrift.Constants.READONLY_OPTION;
import java.net.InetAddress; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
import java.net.InetSocketAddress; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.security.auth.callback.Callback; import java.io.IOException;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.SaslServer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SecurityUtil;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.CallQueue; import org.apache.hadoop.hbase.thrift.HBaseServiceHandler;
import org.apache.hadoop.hbase.thrift.THBaseThreadPoolExecutor; import org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService; import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.DNS; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.hbase.util.JvmPauseMonitor;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter; import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options; import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
/** /**
* ThriftServer - this class starts up a Thrift server which implements the HBase API specified in * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in
* the HbaseClient.thrift IDL file. * the HbaseClient.thrift IDL file.
*/ */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
justification = "Change the name will be an incompatible change, will do it later")
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public class ThriftServer extends Configured implements Tool { public class ThriftServer extends org.apache.hadoop.hbase.thrift.ThriftServer {
private static final Logger log = LoggerFactory.getLogger(ThriftServer.class); private static final Logger log = LoggerFactory.getLogger(ThriftServer.class);
/**
* Thrift quality of protection configuration key. Valid values can be:
* privacy: authentication, integrity and confidentiality checking
* integrity: authentication and integrity checking
* authentication: authentication only
*
* This is used to authenticate the callers and support impersonation.
* The thrift server and the HBase cluster must run in secure mode.
*/
static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog"; public ThriftServer(Configuration conf) {
super(conf);
public static final int DEFAULT_LISTEN_PORT = 9090;
private static final String READ_TIMEOUT_OPTION = "readTimeout";
/**
* Amount of time in milliseconds before a server thread will timeout
* waiting for client to send data on a connected socket. Currently,
* applies only to TBoundedThreadPoolServer
*/
public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
"hbase.thrift.server.socket.read.timeout";
public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
public ThriftServer() {
} }
private static void printUsage() { @Override
protected void printUsageAndExit(Options options, int exitCode)
throws Shell.ExitCodeException {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, getOptions(), formatter.printHelp("Thrift", null, options,
"To start the Thrift server run 'hbase-daemon.sh start thrift2' or " + "To start the Thrift server run 'hbase-daemon.sh start thrift2' or " +
"'hbase thrift2'\n" + "'hbase thrift2'\n" +
"To shutdown the thrift server run 'hbase-daemon.sh stop thrift2' or" + "To shutdown the thrift server run 'hbase-daemon.sh stop thrift2' or" +
" send a kill signal to the thrift server pid", " send a kill signal to the thrift server pid",
true); true);
throw new Shell.ExitCodeException(exitCode, "");
} }
private static Options getOptions() {
Options options = new Options();
options.addOption("b", "bind", true,
"Address to bind the Thrift server to. [default: 0.0.0.0]");
options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT + "]");
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
options.addOption("s", "selectors", true, "How many selector threads to use.");
options.addOption("q", "callQueueSize", true,
"Max size of request queue (unbounded by default)");
options.addOption("h", "help", false, "Print help information");
options.addOption(null, "infoport", true, "Port for web UI");
options.addOption("t", READ_TIMEOUT_OPTION, true,
"Amount of time in milliseconds before a server thread will timeout " +
"waiting for client to send data on a connected socket. Currently, " +
"only applies to TBoundedThreadPoolServer");
options.addOption("ro", "readonly", false,
"Respond only to read method requests [default: false]");
OptionGroup servers = new OptionGroup();
servers.addOption(new Option("nonblocking", false,
"Use the TNonblockingServer. This implies the framed transport."));
servers.addOption(new Option("hsha", false,
"Use the THsHaServer. This implies the framed transport."));
servers.addOption(new Option("selector", false,
"Use the TThreadedSelectorServer. This implies the framed transport."));
servers.addOption(new Option("threadpool", false,
"Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers);
return options;
}
private static CommandLine parseArguments(Configuration conf, Options options, String[] args)
throws ParseException, IOException {
CommandLineParser parser = new DefaultParser();
return parser.parse(options, args);
}
private static TProtocolFactory getTProtocolFactory(boolean isCompact) {
if (isCompact) {
log.debug("Using compact protocol");
return new TCompactProtocol.Factory();
} else {
log.debug("Using binary protocol");
return new TBinaryProtocol.Factory();
}
}
private static TTransportFactory getTTransportFactory(
SaslUtil.QualityOfProtection qop, String name, String host,
boolean framed, int frameSize) {
if (framed) {
if (qop != null) {
throw new RuntimeException("Thrift server authentication"
+ " doesn't work with framed transport yet");
}
log.debug("Using framed transport");
return new TFramedTransport.Factory(frameSize);
} else if (qop == null) {
return new TTransportFactory();
} else {
Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
new SaslGssCallbackHandler() {
@Override @Override
public void handle(Callback[] callbacks) protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
throws UnsupportedCallbackException { throws IOException {
AuthorizeCallback ac = null; return new ThriftHBaseServiceHandler(conf, userProvider);
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL GSSAPI Callback");
}
}
if (ac != null) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (!authid.equals(authzid)) {
ac.setAuthorized(false);
} else {
ac.setAuthorized(true);
String userName = SecurityUtil.getUserFromPrincipal(authzid);
log.info("Effective user: " + userName);
ac.setAuthorizedID(userName);
}
}
}
});
return saslFactory;
}
} }
/* @Override
* If bindValue is null, we don't bind. protected TProcessor createProcessor() {
*/ return new THBaseService.Processor<>(HbaseHandlerMetricsProxy
private static InetSocketAddress bindToPort(String bindValue, int listenPort) .newInstance((THBaseService.Iface) hbaseServiceHandler, metrics, conf));
throws UnknownHostException {
try {
if (bindValue == null) {
return new InetSocketAddress(listenPort);
} else {
return new InetSocketAddress(InetAddress.getByName(bindValue), listenPort);
}
} catch (UnknownHostException e) {
throw new RuntimeException("Could not bind to provided ip address", e);
}
} }
private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, @Override
TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) protected void addOptions(Options options) {
throws TTransportException { super.addOptions(options);
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); options.addOption("ro", READONLY_OPTION, false,
log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString()); "Respond only to read method requests [default: false]");
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
return new TNonblockingServer(serverArgs);
} }
private static TServer getTHsHaServer(TProtocolFactory protocolFactory, @Override
TProcessor processor, TTransportFactory transportFactory, protected void parseCommandLine(CommandLine cmd, Options options) throws Shell.ExitCodeException {
int workerThreads, int maxCallQueueSize, super.parseCommandLine(cmd, options);
InetSocketAddress inetSocketAddress, ThriftMetrics metrics) boolean readOnly = THRIFT_READONLY_ENABLED_DEFAULT;
throws TTransportException { if (cmd.hasOption(READONLY_OPTION)) {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress); readOnly = true;
log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
if (workerThreads > 0) {
// Could support the min & max threads, avoiding to preserve existing functionality.
serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
}
ExecutorService executorService = createExecutor(
workerThreads, maxCallQueueSize, metrics);
serverArgs.executorService(executorService);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
return new THsHaServer(serverArgs);
}
private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
TProcessor processor, TTransportFactory transportFactory,
int workerThreads, int selectorThreads, int maxCallQueueSize,
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
if (workerThreads > 0) {
serverArgs.workerThreads(workerThreads);
}
if (selectorThreads > 0) {
serverArgs.selectorThreads(selectorThreads);
}
ExecutorService executorService = createExecutor(
workerThreads, maxCallQueueSize, metrics);
serverArgs.executorService(executorService);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
return new TThreadedSelectorServer(serverArgs);
}
private static ExecutorService createExecutor(
int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
CallQueue callQueue;
if (maxCallQueueSize > 0) {
callQueue = new CallQueue(new LinkedBlockingQueue<>(maxCallQueueSize), metrics);
} else {
callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
}
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
tfb.setNameFormat("thrift2-worker-%d");
ThreadPoolExecutor pool = new THBaseThreadPoolExecutor(workerThreads, workerThreads,
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
pool.prestartAllCoreThreads();
return pool;
}
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
TProcessor processor,
TTransportFactory transportFactory,
int workerThreads,
InetSocketAddress inetSocketAddress,
int backlog,
int clientTimeout,
ThriftMetrics metrics)
throws TTransportException {
TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs().
bindAddr(inetSocketAddress).backlog(backlog).
clientTimeout(clientTimeout));
log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
if (workerThreads > 0) {
serverArgs.maxWorkerThreads(workerThreads);
}
ThreadPoolExecutor executor = new THBaseThreadPoolExecutor(serverArgs.minWorkerThreads,
serverArgs.maxWorkerThreads, serverArgs.stopTimeoutVal, TimeUnit.SECONDS,
new SynchronousQueue<>(), metrics);
serverArgs.executorService(executor);
return new TThreadPoolServer(serverArgs);
}
/**
* Adds the option to pre-load filters at startup.
*
* @param conf The current configuration instance.
*/
protected static void registerFilters(Configuration conf) {
String[] filters = conf.getStrings("hbase.thrift.filters");
if(filters != null) {
for(String filterClass: filters) {
String[] filterPart = filterClass.split(":");
if(filterPart.length != 2) {
log.warn("Invalid filter specification " + filterClass + " - skipping");
} else {
ParseFilter.registerFilter(filterPart[0], filterPart[1]);
}
}
} }
conf.setBoolean(THRIFT_READONLY_ENABLED, readOnly);
} }
/** /**
@ -375,249 +106,8 @@ public class ThriftServer extends Configured implements Tool {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final Configuration conf = HBaseConfiguration.create(); final Configuration conf = HBaseConfiguration.create();
// for now, only time we return is on an argument error. // for now, only time we return is on an argument error.
final int status = ToolRunner.run(conf, new ThriftServer(), args); final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
System.exit(status); System.exit(status);
} }
@Override
public int run(String[] args) throws Exception {
final Configuration conf = getConf();
Options options = getOptions();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
int selectorThreads = 0;
int maxCallQueueSize = -1; // use unbounded queue by default
if (cmd.hasOption("help")) {
printUsage();
return 1;
}
// Get address to bind
String bindAddress = getBindAddress(conf, cmd);
// check if server should only process read requests, if so override the conf
if (cmd.hasOption("readonly")) {
conf.setBoolean("hbase.thrift.readonly", true);
if (log.isDebugEnabled()) {
log.debug("readonly set to true");
}
}
// Get read timeout
int readTimeout = getReadTimeout(conf, cmd);
// Get port to bind to
int listenPort = getListenPort(conf, cmd);
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
// Local hostname and user name, used only if QOP is configured.
String host = null;
String name = null;
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
&& userProvider.isHBaseSecurityEnabled();
if (securityEnabled) {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
userProvider.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", host);
}
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
String stringQop = conf.get(THRIFT_QOP_KEY);
SaslUtil.QualityOfProtection qop = null;
if (stringQop != null) {
qop = SaslUtil.getQop(stringQop);
if (!securityEnabled) {
throw new IOException("Thrift server must run in secure mode to support authentication");
}
// Extract the name from the principal
name = SecurityUtil.getUserFromPrincipal(conf.get("hbase.thrift.kerberos.principal"));
}
boolean nonblocking = cmd.hasOption("nonblocking");
boolean hsha = cmd.hasOption("hsha");
boolean selector = cmd.hasOption("selector");
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
String implType = getImplType(nonblocking, hsha, selector);
conf.set("hbase.regionserver.thrift.server.type", implType);
conf.setInt("hbase.regionserver.thrift.port", listenPort);
registerFilters(conf);
// Construct correct ProtocolFactory
boolean compact = cmd.hasOption("compact") ||
conf.getBoolean("hbase.regionserver.thrift.compact", false);
TProtocolFactory protocolFactory = getTProtocolFactory(compact);
final ThriftHBaseServiceHandler hbaseHandler =
new ThriftHBaseServiceHandler(conf, userProvider);
THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
final THBaseService.Processor p = new THBaseService.Processor(handler);
conf.setBoolean("hbase.regionserver.thrift.compact", compact);
TProcessor processor = p;
boolean framed = cmd.hasOption("framed") ||
conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
InetSocketAddress inetSocketAddress = bindToPort(bindAddress, listenPort);
conf.setBoolean("hbase.regionserver.thrift.framed", framed);
if (qop != null) {
// Create a processor wrapper, to get the caller
processor = new TProcessor() {
@Override
public boolean process(TProtocol inProt,
TProtocol outProt) throws TException {
TSaslServerTransport saslServerTransport =
(TSaslServerTransport)inProt.getTransport();
SaslServer saslServer = saslServerTransport.getSaslServer();
String principal = saslServer.getAuthorizationID();
hbaseHandler.setEffectiveUser(principal);
return p.process(inProt, outProt);
}
};
}
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
if (cmd.hasOption("s")) {
selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
}
if (cmd.hasOption("q")) {
maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
}
// check for user-defined info server port setting, if so override the conf
try {
if (cmd.hasOption("infoport")) {
String val = cmd.getOptionValue("infoport");
conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
log.debug("Web UI port set to " + val);
}
} catch (NumberFormatException e) {
log.error("Could not parse the value provided for the infoport option", e);
printUsage();
System.exit(1);
}
// Put up info server.
startInfoServer(conf);
final TServer tserver = getServer(workerThreads, selectorThreads, maxCallQueueSize, readTimeout,
backlog, nonblocking, hsha, selector, metrics, protocolFactory, processor,
transportFactory, inetSocketAddress);
realUser.doAs(
new PrivilegedAction<Object>() {
@Override
public Object run() {
pauseMonitor.start();
try {
tserver.serve();
return null;
} finally {
pauseMonitor.stop();
}
}
});
// when tserver.stop eventually happens we'll get here.
return 0;
}
private String getImplType(boolean nonblocking, boolean hsha, boolean selector) {
String implType = "threadpool";
if (nonblocking) {
implType = "nonblocking";
} else if (hsha) {
implType = "hsha";
} else if (selector) {
implType = "selector";
}
return implType;
}
private String getBindAddress(Configuration conf, CommandLine cmd) {
String bindAddress;
if (cmd.hasOption("bind")) {
bindAddress = cmd.getOptionValue("bind");
conf.set("hbase.thrift.info.bindAddress", bindAddress);
} else {
bindAddress = conf.get("hbase.thrift.info.bindAddress");
}
return bindAddress;
}
private int getListenPort(Configuration conf, CommandLine cmd) {
int listenPort;
try {
if (cmd.hasOption("port")) {
listenPort = Integer.parseInt(cmd.getOptionValue("port"));
} else {
listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
}
} catch (NumberFormatException e) {
throw new RuntimeException("Could not parse the value provided for the port option", e);
}
return listenPort;
}
private int getReadTimeout(Configuration conf, CommandLine cmd) {
int readTimeout;
if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
try {
readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
} catch (NumberFormatException e) {
throw new RuntimeException("Could not parse the value provided for the timeout option", e);
}
} else {
readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
}
return readTimeout;
}
private void startInfoServer(Configuration conf) throws IOException {
int port = conf.getInt("hbase.thrift.info.port", 9095);
if (port >= 0) {
conf.setLong("startcode", System.currentTimeMillis());
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
infoServer.setAttribute("hbase.conf", conf);
infoServer.start();
}
}
private TServer getServer(int workerThreads, int selectorThreads, int maxCallQueueSize,
int readTimeout, int backlog, boolean nonblocking, boolean hsha, boolean selector,
ThriftMetrics metrics, TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress)
throws TTransportException {
TServer server;
if (nonblocking) {
server = getTNonBlockingServer(protocolFactory, processor, transportFactory,
inetSocketAddress);
} else if (hsha) {
server = getTHsHaServer(protocolFactory, processor, transportFactory, workerThreads,
maxCallQueueSize, inetSocketAddress, metrics);
} else if (selector) {
server = getTThreadedSelectorServer(protocolFactory, processor, transportFactory,
workerThreads, selectorThreads, maxCallQueueSize, inetSocketAddress, metrics);
} else {
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, workerThreads,
inetSocketAddress, backlog, readTimeout, metrics);
}
return server;
}
} }

View File

@ -20,10 +20,10 @@
<%@ page contentType="text/html;charset=UTF-8" <%@ page contentType="text/html;charset=UTF-8"
import="org.apache.hadoop.conf.Configuration" import="org.apache.hadoop.conf.Configuration"
import="org.apache.hadoop.hbase.HBaseConfiguration" import="org.apache.hadoop.hbase.HBaseConfiguration"
import="org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType"
import="org.apache.hadoop.hbase.util.VersionInfo" import="org.apache.hadoop.hbase.util.VersionInfo"
import="java.util.Date" import="java.util.Date"
%> %>
<%@ page import="org.apache.hadoop.hbase.thrift.ImplType" %>
<% <%
Configuration conf = (Configuration)getServletContext().getAttribute("hbase.conf"); Configuration conf = (Configuration)getServletContext().getAttribute("hbase.conf");

View File

@ -17,8 +17,9 @@
*/ */
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -65,7 +66,7 @@ public class TestThriftHttpServer {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestThriftHttpServer.class); LoggerFactory.getLogger(TestThriftHttpServer.class);
static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Thread httpServerThread; private Thread httpServerThread;
private volatile Exception httpServerException; private volatile Exception httpServerException;
@ -97,18 +98,17 @@ public class TestThriftHttpServer {
conf.set("hbase.thrift.security.qop", "privacy"); conf.set("hbase.thrift.security.qop", "privacy");
conf.setBoolean("hbase.thrift.ssl.enabled", false); conf.setBoolean("hbase.thrift.ssl.enabled", false);
ThriftServerRunner runner = null; ThriftServer server = null;
ExpectedException thrown = ExpectedException.none(); ExpectedException thrown = ExpectedException.none();
try { try {
thrown.expect(IllegalArgumentException.class); thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Thrift HTTP Server's QoP is privacy, " + thrown.expectMessage("Thrift HTTP Server's QoP is privacy, " +
"but hbase.thrift.ssl.enabled is false"); "but hbase.thrift.ssl.enabled is false");
runner = new ThriftServerRunner(conf); server = new ThriftServer(conf);
server.run();
fail("Thrift HTTP Server starts up even with wrong security configurations."); fail("Thrift HTTP Server starts up even with wrong security configurations.");
} catch (Exception e) { } catch (Exception e) {
} }
assertNull(runner);
} }
private void startHttpServerThread(final String[] args) { private void startHttpServerThread(final String[] args) {
@ -117,7 +117,7 @@ public class TestThriftHttpServer {
httpServerException = null; httpServerException = null;
httpServerThread = new Thread(() -> { httpServerThread = new Thread(() -> {
try { try {
thriftServer.doMain(args); thriftServer.run(args);
} catch (Exception e) { } catch (Exception e) {
httpServerException = e; httpServerException = e;
} }
@ -145,6 +145,10 @@ public class TestThriftHttpServer {
runThriftServer(1024 * 64); runThriftServer(1024 * 64);
} }
protected ThriftServer createThriftServer() {
return new ThriftServer(TEST_UTIL.getConfiguration());
}
@Test @Test
public void testRunThriftServer() throws Exception { public void testRunThriftServer() throws Exception {
runThriftServer(0); runThriftServer(0);
@ -153,14 +157,14 @@ public class TestThriftHttpServer {
void runThriftServer(int customHeaderSize) throws Exception { void runThriftServer(int customHeaderSize) throws Exception {
List<String> args = new ArrayList<>(3); List<String> args = new ArrayList<>(3);
port = HBaseTestingUtility.randomFreePort(); port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION); args.add("-" + PORT_OPTION);
args.add(String.valueOf(port)); args.add(String.valueOf(port));
args.add("-" + ThriftServer.INFOPORT_OPTION); args.add("-" + INFOPORT_OPTION);
int infoPort = HBaseTestingUtility.randomFreePort(); int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort)); args.add(String.valueOf(infoPort));
args.add("start"); args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration()); thriftServer = createThriftServer();
startHttpServerThread(args.toArray(new String[args.size()])); startHttpServerThread(args.toArray(new String[args.size()]));
// wait up to 10s for the server to start // wait up to 10s for the server to start
@ -195,9 +199,9 @@ public class TestThriftHttpServer {
Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode()); Assert.assertEquals(HttpURLConnection.HTTP_FORBIDDEN, conn.getResponseCode());
} }
static volatile boolean tableCreated = false; protected static volatile boolean tableCreated = false;
void talkToThriftServer(String url, int customHeaderSize) throws Exception { protected void talkToThriftServer(String url, int customHeaderSize) throws Exception {
THttpClient httpClient = new THttpClient(url); THttpClient httpClient = new THttpClient(url);
httpClient.open(); httpClient.open();

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -46,7 +47,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Hbase;
@ -71,7 +71,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* Unit testing for ThriftServerRunner.HBaseHandler, a part of the * Unit testing for ThriftServerRunner.HBaseServiceHandler, a part of the
* org.apache.hadoop.hbase.thrift package. * org.apache.hadoop.hbase.thrift package.
*/ */
@Category({ClientTests.class, LargeTests.class}) @Category({ClientTests.class, LargeTests.class})
@ -113,7 +113,7 @@ public class TestThriftServer {
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
UTIL.getConfiguration().setBoolean(ThriftServerRunner.COALESCE_INC_KEY, true); UTIL.getConfiguration().setBoolean(COALESCE_INC_KEY, true);
UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false); UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
UTIL.getConfiguration().setInt("hbase.client.retries.number", 3); UTIL.getConfiguration().setInt("hbase.client.retries.number", 3);
UTIL.startMiniCluster(); UTIL.startMiniCluster();
@ -152,8 +152,8 @@ public class TestThriftServer {
* IllegalArgument exception. * IllegalArgument exception.
*/ */
public void doTestTableCreateDrop() throws Exception { public void doTestTableCreateDrop() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableCreateDrop(handler); doTestTableCreateDrop(handler);
} }
@ -163,7 +163,7 @@ public class TestThriftServer {
dropTestTables(handler); dropTestTables(handler);
} }
public static final class MySlowHBaseHandler extends ThriftServerRunner.HBaseHandler public static final class MySlowHBaseHandler extends ThriftHBaseServiceHandler
implements Hbase.Iface { implements Hbase.Iface {
protected MySlowHBaseHandler(Configuration c) protected MySlowHBaseHandler(Configuration c)
@ -230,7 +230,7 @@ public class TestThriftServer {
private static Hbase.Iface getHandlerForMetricsTest(ThriftMetrics metrics, Configuration conf) private static Hbase.Iface getHandlerForMetricsTest(ThriftMetrics metrics, Configuration conf)
throws Exception { throws Exception {
Hbase.Iface handler = new MySlowHBaseHandler(conf); Hbase.Iface handler = new MySlowHBaseHandler(conf);
return HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); return HbaseHandlerMetricsProxy.newInstance((ThriftHBaseServiceHandler)handler, metrics, conf);
} }
private static ThriftMetrics getMetrics(Configuration conf) throws Exception { private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
@ -270,15 +270,15 @@ public class TestThriftServer {
} }
public void doTestIncrements() throws Exception { public void doTestIncrements() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
createTestTables(handler); createTestTables(handler);
doTestIncrements(handler); doTestIncrements(handler);
dropTestTables(handler); dropTestTables(handler);
} }
public static void doTestIncrements(HBaseHandler handler) throws Exception { public static void doTestIncrements(ThriftHBaseServiceHandler handler) throws Exception {
List<Mutation> mutations = new ArrayList<>(1); List<Mutation> mutations = new ArrayList<>(1);
mutations.add(new Mutation(false, columnAAname, valueEname, true)); mutations.add(new Mutation(false, columnAAname, valueEname, true));
mutations.add(new Mutation(false, columnAname, valueEname, true)); mutations.add(new Mutation(false, columnAname, valueEname, true));
@ -318,8 +318,8 @@ public class TestThriftServer {
* versions. * versions.
*/ */
public void doTestTableMutations() throws Exception { public void doTestTableMutations() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableMutations(handler); doTestTableMutations(handler);
} }
@ -395,8 +395,8 @@ public class TestThriftServer {
*/ */
public void doTestTableTimestampsAndColumns() throws Exception { public void doTestTableTimestampsAndColumns() throws Exception {
// Setup // Setup
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors()); handler.createTable(tableAname, getColumnDescriptors());
@ -473,8 +473,8 @@ public class TestThriftServer {
*/ */
public void doTestTableScanners() throws Exception { public void doTestTableScanners() throws Exception {
// Setup // Setup
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors()); handler.createTable(tableAname, getColumnDescriptors());
@ -592,8 +592,8 @@ public class TestThriftServer {
* Tests for GetTableRegions * Tests for GetTableRegions
*/ */
public void doTestGetTableRegions() throws Exception { public void doTestGetTableRegions() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
doTestGetTableRegions(handler); doTestGetTableRegions(handler);
} }
@ -620,7 +620,7 @@ public class TestThriftServer {
conf.set("hbase.thrift.filters", "MyFilter:filterclass"); conf.set("hbase.thrift.filters", "MyFilter:filterclass");
ThriftServerRunner.registerFilters(conf); ThriftServer.registerFilters(conf);
Map<String, String> registeredFilters = ParseFilter.getAllFilters(); Map<String, String> registeredFilters = ParseFilter.getAllFilters();
@ -628,8 +628,8 @@ public class TestThriftServer {
} }
public void doTestGetRegionInfo() throws Exception { public void doTestGetRegionInfo() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
doTestGetRegionInfo(handler); doTestGetRegionInfo(handler);
} }
@ -655,8 +655,8 @@ public class TestThriftServer {
* Appends the value to a cell and checks that the cell value is updated properly. * Appends the value to a cell and checks that the cell value is updated properly.
*/ */
public static void doTestAppend() throws Exception { public static void doTestAppend() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors()); handler.createTable(tableAname, getColumnDescriptors());
try { try {
@ -687,8 +687,8 @@ public class TestThriftServer {
* the checkAndPut succeeds. * the checkAndPut succeeds.
*/ */
public static void doTestCheckAndPut() throws Exception { public static void doTestCheckAndPut() throws Exception {
ThriftServerRunner.HBaseHandler handler = ThriftHBaseServiceHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors()); handler.createTable(tableAname, getColumnDescriptors());
try { try {
@ -729,8 +729,8 @@ public class TestThriftServer {
Configuration conf = UTIL.getConfiguration(); Configuration conf = UTIL.getConfiguration();
ThriftMetrics metrics = getMetrics(conf); ThriftMetrics metrics = getMetrics(conf);
ThriftServerRunner.HBaseHandler hbaseHandler = ThriftHBaseServiceHandler hbaseHandler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), new ThriftHBaseServiceHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration())); UserProvider.instantiate(UTIL.getConfiguration()));
Hbase.Iface handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf); Hbase.Iface handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
@ -863,10 +863,10 @@ public class TestThriftServer {
* the scanner. * the scanner.
* *
* @param scannerId the scanner to close * @param scannerId the scanner to close
* @param handler the HBaseHandler interfacing to HBase * @param handler the HBaseServiceHandler interfacing to HBase
*/ */
private void closeScanner( private void closeScanner(
int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception { int scannerId, ThriftHBaseServiceHandler handler) throws Exception {
handler.scannerGet(scannerId); handler.scannerGet(scannerId);
handler.scannerClose(scannerId); handler.scannerClose(scannerId);
} }

View File

@ -17,6 +17,11 @@
*/ */
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -28,7 +33,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@ -68,12 +72,12 @@ public class TestThriftServerCmdLine {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestThriftServerCmdLine.class); LoggerFactory.getLogger(TestThriftServerCmdLine.class);
private final ImplType implType; protected final ImplType implType;
private boolean specifyFramed; protected boolean specifyFramed;
private boolean specifyBindIP; protected boolean specifyBindIP;
private boolean specifyCompact; protected boolean specifyCompact;
private static final HBaseTestingUtility TEST_UTIL = protected static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility(); new HBaseTestingUtility();
private Thread cmdLineThread; private Thread cmdLineThread;
@ -81,8 +85,8 @@ public class TestThriftServerCmdLine {
private Exception clientSideException; private Exception clientSideException;
private ThriftServer thriftServer; private volatile ThriftServer thriftServer;
private int port; protected int port;
@Parameters @Parameters
public static Collection<Object[]> getParameters() { public static Collection<Object[]> getParameters() {
@ -143,8 +147,9 @@ public class TestThriftServerCmdLine {
@Override @Override
public void run() { public void run() {
try { try {
thriftServer.doMain(args); thriftServer.run(args);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error when start thrift server", e);
cmdLineException = e; cmdLineException = e;
} }
} }
@ -154,6 +159,10 @@ public class TestThriftServerCmdLine {
cmdLineThread.start(); cmdLineThread.start();
} }
protected ThriftServer createThriftServer() {
return new ThriftServer(TEST_UTIL.getConfiguration());
}
@Test @Test
public void testRunThriftServer() throws Exception { public void testRunThriftServer() throws Exception {
List<String> args = new ArrayList<>(); List<String> args = new ArrayList<>();
@ -163,37 +172,37 @@ public class TestThriftServerCmdLine {
args.add(serverTypeOption); args.add(serverTypeOption);
} }
port = HBaseTestingUtility.randomFreePort(); port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION); args.add("-" + PORT_OPTION);
args.add(String.valueOf(port)); args.add(String.valueOf(port));
args.add("-" + ThriftServer.INFOPORT_OPTION); args.add("-" + INFOPORT_OPTION);
int infoPort = HBaseTestingUtility.randomFreePort(); int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort)); args.add(String.valueOf(infoPort));
if (specifyFramed) { if (specifyFramed) {
args.add("-" + ThriftServer.FRAMED_OPTION); args.add("-" + FRAMED_OPTION);
} }
if (specifyBindIP) { if (specifyBindIP) {
args.add("-" + ThriftServer.BIND_OPTION); args.add("-" + BIND_OPTION);
args.add(InetAddress.getLocalHost().getHostName()); args.add(InetAddress.getLocalHost().getHostName());
} }
if (specifyCompact) { if (specifyCompact) {
args.add("-" + ThriftServer.COMPACT_OPTION); args.add("-" + COMPACT_OPTION);
} }
args.add("start"); args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration()); thriftServer = createThriftServer();
startCmdLineThread(args.toArray(new String[args.size()])); startCmdLineThread(args.toArray(new String[args.size()]));
// wait up to 10s for the server to start // wait up to 10s for the server to start
for (int i = 0; i < 100 for (int i = 0; i < 100
&& (thriftServer.serverRunner == null || thriftServer.serverRunner.tserver == null); i++) { && (thriftServer.tserver == null); i++) {
Thread.sleep(100); Thread.sleep(100);
} }
Class<? extends TServer> expectedClass = implType != null ? Class<? extends TServer> expectedClass = implType != null ?
implType.serverClass : TBoundedThreadPoolServer.class; implType.serverClass : TBoundedThreadPoolServer.class;
assertEquals(expectedClass, assertEquals(expectedClass,
thriftServer.serverRunner.tserver.getClass()); thriftServer.tserver.getClass());
try { try {
talkToThriftServer(); talkToThriftServer();
@ -210,9 +219,9 @@ public class TestThriftServerCmdLine {
} }
} }
private static volatile boolean tableCreated = false; protected static volatile boolean tableCreated = false;
private void talkToThriftServer() throws Exception { protected void talkToThriftServer() throws Exception {
TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(), TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(),
port); port);
TTransport transport = sock; TTransport transport = sock;
@ -228,6 +237,7 @@ public class TestThriftServerCmdLine {
} else { } else {
prot = new TBinaryProtocol(transport); prot = new TBinaryProtocol(transport);
} }
Hbase.Client client = new Hbase.Client(prot); Hbase.Client client = new Hbase.Client(prot);
if (!tableCreated){ if (!tableCreated){
TestThriftServer.createTestTables(client); TestThriftServer.createTestTables(client);

View File

@ -18,12 +18,7 @@
*/ */
package org.apache.hadoop.hbase.thrift; package org.apache.hadoop.hbase.thrift;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_KERBEROS_PRINCIPAL_KEY; import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SUPPORT_PROXYUSER_KEY;
import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.USE_HTTP_CONF_KEY;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -133,14 +128,14 @@ public class TestThriftSpnegoHttpServer extends TestThriftHttpServer {
HBaseKerberosUtils.setSecuredConfiguration(conf, serverPrincipal, spnegoServerPrincipal); HBaseKerberosUtils.setSecuredConfiguration(conf, serverPrincipal, spnegoServerPrincipal);
conf.setBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, true); conf.setBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, true);
conf.setBoolean(USE_HTTP_CONF_KEY, true); conf.setBoolean(Constants.USE_HTTP_CONF_KEY, true);
conf.set("hadoop.proxyuser.hbase.hosts", "*"); conf.set("hadoop.proxyuser.hbase.hosts", "*");
conf.set("hadoop.proxyuser.hbase.groups", "*"); conf.set("hadoop.proxyuser.hbase.groups", "*");
conf.set(THRIFT_KERBEROS_PRINCIPAL_KEY, serverPrincipal); conf.set(Constants.THRIFT_KERBEROS_PRINCIPAL_KEY, serverPrincipal);
conf.set(THRIFT_KEYTAB_FILE_KEY, serverKeytab.getAbsolutePath()); conf.set(Constants.THRIFT_KEYTAB_FILE_KEY, serverKeytab.getAbsolutePath());
conf.set(THRIFT_SPNEGO_PRINCIPAL_KEY, spnegoServerPrincipal); conf.set(Constants.THRIFT_SPNEGO_PRINCIPAL_KEY, spnegoServerPrincipal);
conf.set(THRIFT_SPNEGO_KEYTAB_FILE_KEY, spnegoServerKeytab.getAbsolutePath()); conf.set(Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY, spnegoServerKeytab.getAbsolutePath());
} }
@BeforeClass @BeforeClass
@ -170,7 +165,7 @@ public class TestThriftSpnegoHttpServer extends TestThriftHttpServer {
spnegoServerKeytab = new File(keytabDir, spnegoServerPrincipal.replace('/', '_') + ".keytab"); spnegoServerKeytab = new File(keytabDir, spnegoServerPrincipal.replace('/', '_') + ".keytab");
setupUser(kdc, spnegoServerKeytab, spnegoServerPrincipal); setupUser(kdc, spnegoServerKeytab, spnegoServerPrincipal);
TEST_UTIL.getConfiguration().setBoolean(USE_HTTP_CONF_KEY, true); TEST_UTIL.getConfiguration().setBoolean(Constants.USE_HTTP_CONF_KEY, true);
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false); TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
addSecurityConfigurations(TEST_UTIL.getConfiguration()); addSecurityConfigurations(TEST_UTIL.getConfiguration());
@ -191,7 +186,7 @@ public class TestThriftSpnegoHttpServer extends TestThriftHttpServer {
} }
@Override @Override
void talkToThriftServer(String url, int customHeaderSize) throws Exception { protected void talkToThriftServer(String url, int customHeaderSize) throws Exception {
// Close httpClient and THttpClient automatically on any failures // Close httpClient and THttpClient automatically on any failures
try ( try (
CloseableHttpClient httpClient = createHttpClient(); CloseableHttpClient httpClient = createHttpClient();

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.thrift.TestThriftHttpServer;
import org.apache.hadoop.hbase.thrift2.generated.TColumnFamilyDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.THttpClient;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, MediumTests.class})
public class TestThrift2HttpServer extends TestThriftHttpServer {
private static final String TABLENAME = "TestThrift2HttpServerTable";
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestThrift2HttpServer.class);
@Override
protected ThriftServer createThriftServer() {
return new ThriftServer(TEST_UTIL.getConfiguration());
}
@Override
protected void talkToThriftServer(String url, int customHeaderSize) throws Exception {
THttpClient httpClient = new THttpClient(url);
httpClient.open();
if (customHeaderSize > 0) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < customHeaderSize; i++) {
sb.append("a");
}
httpClient.setCustomHeader("User-Agent", sb.toString());
}
try {
TProtocol prot;
prot = new TBinaryProtocol(httpClient);
THBaseService.Client client = new THBaseService.Client(prot);
TTableName tTableName = new TTableName();
tTableName.setNs(Bytes.toBytes(""));
tTableName.setQualifier(Bytes.toBytes(TABLENAME));
if (!tableCreated){
Assert.assertTrue(!client.tableExists(tTableName));
TTableDescriptor tTableDescriptor = new TTableDescriptor();
tTableDescriptor.setTableName(tTableName);
TColumnFamilyDescriptor columnFamilyDescriptor = new TColumnFamilyDescriptor();
columnFamilyDescriptor.setName(Bytes.toBytes(TABLENAME));
tTableDescriptor.addToColumns(columnFamilyDescriptor);
client.createTable(tTableDescriptor, new ArrayList<>());
tableCreated = true;
}
Assert.assertTrue(client.tableExists(tTableName));
} finally {
httpClient.close();
}
}
}

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift2;
import java.net.InetAddress;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.thrift.ImplType;
import org.apache.hadoop.hbase.thrift.TestThriftServerCmdLine;
import org.apache.hadoop.hbase.thrift2.generated.TColumnFamilyDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
import org.apache.hadoop.hbase.thrift2.generated.TTableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, MediumTests.class})
public class TestThrift2ServerCmdLine extends TestThriftServerCmdLine {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestThrift2ServerCmdLine.class);
private static final String TABLENAME = "TestThrift2ServerCmdLineTable";
@Override
protected ThriftServer createThriftServer() {
return new ThriftServer(TEST_UTIL.getConfiguration());
}
public TestThrift2ServerCmdLine(ImplType implType, boolean specifyFramed,
boolean specifyBindIP, boolean specifyCompact) {
super(implType, specifyFramed, specifyBindIP, specifyCompact);
}
@Override
protected void talkToThriftServer() throws Exception {
TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(),
port);
TTransport transport = sock;
if (specifyFramed || implType.isAlwaysFramed()) {
transport = new TFramedTransport(transport);
}
sock.open();
try {
TProtocol tProtocol;
if (specifyCompact) {
tProtocol = new TCompactProtocol(transport);
} else {
tProtocol = new TBinaryProtocol(transport);
}
THBaseService.Client client = new THBaseService.Client(tProtocol);
TTableName tTableName = new TTableName();
tTableName.setNs(Bytes.toBytes(""));
tTableName.setQualifier(Bytes.toBytes(TABLENAME));
if (!tableCreated){
Assert.assertTrue(!client.tableExists(tTableName));
TTableDescriptor tTableDescriptor = new TTableDescriptor();
tTableDescriptor.setTableName(tTableName);
TColumnFamilyDescriptor columnFamilyDescriptor = new TColumnFamilyDescriptor();
columnFamilyDescriptor.setName(Bytes.toBytes(TABLENAME));
tTableDescriptor.addToColumns(columnFamilyDescriptor);
client.createTable(tTableDescriptor, new ArrayList<>());
tableCreated = true;
}
Assert.assertTrue(client.tableExists(tTableName));
} finally {
sock.close();
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.thrift2; package org.apache.hadoop.hbase.thrift2;
import static java.nio.ByteBuffer.wrap; import static java.nio.ByteBuffer.wrap;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.CLEANUP_INTERVAL;
import static org.apache.hadoop.hbase.thrift.HBaseServiceHandler.MAX_IDLETIME;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.deleteFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.getFromThrift;
import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift; import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.incrementFromThrift;
@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.thrift.ErrorThrowingGetObserver; import org.apache.hadoop.hbase.thrift.ErrorThrowingGetObserver;
import org.apache.hadoop.hbase.thrift.HbaseHandlerMetricsProxy;
import org.apache.hadoop.hbase.thrift.ThriftMetrics; import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.TAppend; import org.apache.hadoop.hbase.thrift2.generated.TAppend;
import org.apache.hadoop.hbase.thrift2.generated.TColumn; import org.apache.hadoop.hbase.thrift2.generated.TColumn;
@ -115,7 +118,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/** /**
* Unit testing for ThriftServer.HBaseHandler, a part of the org.apache.hadoop.hbase.thrift2 * Unit testing for ThriftServer.HBaseServiceHandler, a part of the org.apache.hadoop.hbase.thrift2
* package. * package.
*/ */
@Category({ClientTests.class, MediumTests.class}) @Category({ClientTests.class, MediumTests.class})
@ -749,8 +752,8 @@ public class TestThriftHBaseServiceHandler {
int cleanUpInterval = 100; int cleanUpInterval = 100;
Configuration conf = new Configuration(UTIL.getConfiguration()); Configuration conf = new Configuration(UTIL.getConfiguration());
// Set the ConnectionCache timeout to trigger halfway through the trials // Set the ConnectionCache timeout to trigger halfway through the trials
conf.setInt(ThriftHBaseServiceHandler.MAX_IDLETIME, (numTrials / 2) * trialPause); conf.setInt(MAX_IDLETIME, (numTrials / 2) * trialPause);
conf.setInt(ThriftHBaseServiceHandler.CLEANUP_INTERVAL, cleanUpInterval); conf.setInt(CLEANUP_INTERVAL, cleanUpInterval);
ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf, ThriftHBaseServiceHandler handler = new ThriftHBaseServiceHandler(conf,
UserProvider.instantiate(conf)); UserProvider.instantiate(conf));
@ -1206,7 +1209,7 @@ public class TestThriftHBaseServiceHandler {
ThriftMetrics metrics = getMetrics(conf); ThriftMetrics metrics = getMetrics(conf);
ThriftHBaseServiceHandler hbaseHandler = createHandler(); ThriftHBaseServiceHandler hbaseHandler = createHandler();
THBaseService.Iface handler = THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
byte[] rowName = Bytes.toBytes("testMetrics"); byte[] rowName = Bytes.toBytes("testMetrics");
ByteBuffer table = wrap(tableAname); ByteBuffer table = wrap(tableAname);
@ -1249,7 +1252,7 @@ public class TestThriftHBaseServiceHandler {
ThriftHBaseServiceHandler hbaseHandler = createHandler(); ThriftHBaseServiceHandler hbaseHandler = createHandler();
ThriftMetrics metrics = getMetrics(UTIL.getConfiguration()); ThriftMetrics metrics = getMetrics(UTIL.getConfiguration());
THBaseService.Iface handler = THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, null);
ByteBuffer tTableName = wrap(tableName.getName()); ByteBuffer tTableName = wrap(tableName.getName());
// check metrics increment with a successful get // check metrics increment with a successful get
@ -1323,7 +1326,7 @@ public class TestThriftHBaseServiceHandler {
ThriftHBaseServiceHandler hbaseHandler = createHandler(); ThriftHBaseServiceHandler hbaseHandler = createHandler();
ThriftMetrics metrics = getMetrics(UTIL.getConfiguration()); ThriftMetrics metrics = getMetrics(UTIL.getConfiguration());
THBaseService.Iface handler = THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics); HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, null);
ByteBuffer tTableName = wrap(tableName.getName()); ByteBuffer tTableName = wrap(tableName.getName());
// check metrics latency with a successful get // check metrics latency with a successful get