HBASE-14926 Hung ThriftServer; no timeout on read from client; if client crashes, worker thread gets stuck reading

This commit is contained in:
stack 2015-12-04 13:19:12 -08:00
parent d18c1af3bb
commit 26dd0d17f8
4 changed files with 70 additions and 19 deletions

View File

@ -14,21 +14,24 @@ Example code.
to be able to compile/run the examples without Thrift installed. to be able to compile/run the examples without Thrift installed.
If desired, the code can be re-generated as follows: If desired, the code can be re-generated as follows:
thrift --gen cpp --gen java --gen rb --gen py --gen php --gen perl \ thrift --gen cpp --gen java --gen rb --gen py --gen php --gen perl \
${HBASE_ROOT}/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift ${HBASE_ROOT}/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
and re-placed at the corresponding paths. and re-placed at the corresponding paths. You should not have to do this generally.
Before you run any Thrift examples, find a running HBase Thrift server. Before you run any Thrift examples, find a running HBase Thrift server (and a running
If you start one locally (bin/hbase thrift start), the default port is 9090. hbase cluster for this server to talk to -- at a minimum start a standalone instance
by doing ./bin/start-hbase.sh). If you start one locally (bin/hbase thrift start),
the default port is 9090 (a webserver with basic stats defaults showing on port 9095).
* Java: org.apache.hadoop.hbase.thrift.DemoClient (jar under lib/). * Java: org.apache.hadoop.hbase.thrift.DemoClient (jar under lib/).
1. Set up the classpath with all the necessary jars, for example: 1. Make sure your client has all required jars on the CLASSPATH when it starts. If lazy,
for f in `find . -name "libthrift-*.jar" -or -name "slf4j-*.jar" -or -name "log4j-*.jar"`; do just add all jars as follows: {HBASE_EXAMPLE_CLASSPATH=`./bin/hbase classpath`}
HBASE_EXAMPLE_CLASSPATH=${HBASE_EXAMPLE_CLASSPATH}:$f;
done
2. If HBase server is not secure, or authentication is not enabled for the Thrift server, execute: 2. If HBase server is not secure, or authentication is not enabled for the Thrift server, execute:
{java -cp hbase-examples-[VERSION].jar:${HBASE_EXAMPLE_CLASSPATH} org.apache.hadoop.hbase.thrift.DemoClient <host> <port>} {java -cp hbase-examples-[VERSION].jar:${HBASE_EXAMPLE_CLASSPATH} org.apache.hadoop.hbase.thrift.DemoClient <host> <port>}
3. If HBase server is secure, and authentication is enabled for the Thrift server, run kinit at first, then execute: 3. If HBase server is secure, and authentication is enabled for the Thrift server, run kinit at first, then execute:
{java -cp hbase-examples-[VERSION].jar:${HBASE_EXAMPLE_CLASSPATH} org.apache.hadoop.hbase.thrift.DemoClient <host> <port> true} {java -cp hbase-examples-[VERSION].jar:${HBASE_EXAMPLE_CLASSPATH} org.apache.hadoop.hbase.thrift.DemoClient <host> <port> true}
4. Here is a lazy example that just pulls in all hbase dependency jars and that goes against default location on localhost.
It should work with a standalone hbase instance started by doing ./bin/start-hbase.sh:
{java -cp ./hbase-examples/target/hbase-examples-2.0.0-SNAPSHOT.jar:`./bin/hbase classpath` org.apache.hadoop.hbase.thrift.DemoClient localhost 9090}
* Ruby: hbase-examples/src/main/ruby/DemoClient.rb * Ruby: hbase-examples/src/main/ruby/DemoClient.rb
1. Modify the import path in the file to point to {$THRIFT_HOME}/lib/rb/lib. 1. Modify the import path in the file to point to {$THRIFT_HOME}/lib/rb/lib.

View File

@ -64,6 +64,8 @@ public class ThriftServer {
private InfoServer infoServer; private InfoServer infoServer;
private static final String READ_TIMEOUT_OPTION = "readTimeout";
// //
// Main program and support routines // Main program and support routines
// //
@ -134,6 +136,11 @@ public class ThriftServer {
"The amount of time in secods to keep a thread alive when idle in " + "The amount of time in secods to keep a thread alive when idle in " +
ImplType.THREAD_POOL.simpleClassName()); ImplType.THREAD_POOL.simpleClassName());
options.addOption("t", READ_TIMEOUT_OPTION, true,
"Amount of time in milliseconds before a server thread will timeout " +
"waiting for client to send data on a connected socket. Currently, " +
"only applies to TBoundedThreadPoolServer");
options.addOptionGroup(ImplType.createOptionGroup()); options.addOptionGroup(ImplType.createOptionGroup());
CommandLineParser parser = new PosixParser(); CommandLineParser parser = new PosixParser();
@ -185,7 +192,9 @@ public class ThriftServer {
conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
// Set general thrift server options // Set general thrift server options
boolean compact = cmd.hasOption(COMPACT_OPTION) || boolean compact = cmd.hasOption(COMPACT_OPTION) ||
conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false); conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
@ -194,8 +203,7 @@ public class ThriftServer {
conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false); conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed); conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
if (cmd.hasOption(BIND_OPTION)) { if (cmd.hasOption(BIND_OPTION)) {
conf.set( conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
} }
ImplType.setServerImpl(cmd, conf); ImplType.setServerImpl(cmd, conf);

View File

@ -160,6 +160,15 @@ public class ThriftServerRunner implements Runnable {
static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password"; static final String THRIFT_SSL_KEYSTORE_PASSWORD = "hbase.thrift.ssl.keystore.password";
static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword"; static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD = "hbase.thrift.ssl.keystore.keypassword";
/**
* Amount of time in milliseconds before a server thread will timeout
* waiting for client to send data on a connected socket. Currently,
* applies only to TBoundedThreadPoolServer
*/
public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
"hbase.thrift.server.socket.read.timeout";
public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
/** /**
* Thrift quality of protection configuration key. Valid values can be: * Thrift quality of protection configuration key. Valid values can be:
@ -522,7 +531,6 @@ public class ThriftServerRunner implements Runnable {
if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
implType == ImplType.THREADED_SELECTOR) { implType == ImplType.THREADED_SELECTOR) {
InetAddress listenAddress = getBindAddress(conf); InetAddress listenAddress = getBindAddress(conf);
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket( TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
new InetSocketAddress(listenAddress, listenPort)); new InetSocketAddress(listenAddress, listenPort));
@ -563,10 +571,13 @@ public class ThriftServerRunner implements Runnable {
} else if (implType == ImplType.THREAD_POOL) { } else if (implType == ImplType.THREAD_POOL) {
// Thread pool server. Get the IP address to bind to. // Thread pool server. Get the IP address to bind to.
InetAddress listenAddress = getBindAddress(conf); InetAddress listenAddress = getBindAddress(conf);
int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
TServerTransport serverTransport = new TServerSocket( TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs(). new TServerSocket.ServerSocketTransportArgs().
bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog)); bindAddr(new InetSocketAddress(listenAddress, listenPort)).
backlog(backlog).
clientTimeout(readTimeout));
TBoundedThreadPoolServer.Args serverArgs = TBoundedThreadPoolServer.Args serverArgs =
new TBoundedThreadPoolServer.Args(serverTransport, conf); new TBoundedThreadPoolServer.Args(serverTransport, conf);
@ -575,7 +586,7 @@ public class ThriftServerRunner implements Runnable {
.protocolFactory(protocolFactory); .protocolFactory(protocolFactory);
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ listenAddress + ":" + Integer.toString(listenPort) + listenAddress + ":" + Integer.toString(listenPort)
+ "; " + serverArgs); + " with readTimeout " + readTimeout + "ms; " + serverArgs);
TBoundedThreadPoolServer tserver = TBoundedThreadPoolServer tserver =
new TBoundedThreadPoolServer(serverArgs, metrics); new TBoundedThreadPoolServer(serverArgs, metrics);
this.tserver = tserver; this.tserver = tserver;

View File

@ -112,6 +112,16 @@ public class ThriftServer extends Configured implements Tool {
public static final int DEFAULT_LISTEN_PORT = 9090; public static final int DEFAULT_LISTEN_PORT = 9090;
private static final String READ_TIMEOUT_OPTION = "readTimeout";
/**
* Amount of time in milliseconds before a server thread will timeout
* waiting for client to send data on a connected socket. Currently,
* applies only to TBoundedThreadPoolServer
*/
public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
"hbase.thrift.server.socket.read.timeout";
public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
public ThriftServer() { public ThriftServer() {
} }
@ -135,7 +145,10 @@ public class ThriftServer extends Configured implements Tool {
options.addOption("w", "workers", true, "How many worker threads to use."); options.addOption("w", "workers", true, "How many worker threads to use.");
options.addOption("h", "help", false, "Print help information"); options.addOption("h", "help", false, "Print help information");
options.addOption(null, "infoport", true, "Port for web UI"); options.addOption(null, "infoport", true, "Port for web UI");
options.addOption("t", READ_TIMEOUT_OPTION, true,
"Amount of time in milliseconds before a server thread will timeout " +
"waiting for client to send data on a connected socket. Currently, " +
"only applies to TBoundedThreadPoolServer");
OptionGroup servers = new OptionGroup(); OptionGroup servers = new OptionGroup();
servers.addOption( servers.addOption(
new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
@ -275,11 +288,13 @@ public class ThriftServer extends Configured implements Tool {
TTransportFactory transportFactory, TTransportFactory transportFactory,
int workerThreads, int workerThreads,
InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress,
int backlog) int backlog,
int clientTimeout)
throws TTransportException { throws TTransportException {
TServerTransport serverTransport = new TServerSocket( TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs(). new TServerSocket.ServerSocketTransportArgs().
bindAddr(inetSocketAddress).backlog(backlog)); bindAddr(inetSocketAddress).backlog(backlog).
clientTimeout(clientTimeout));
log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString()); log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor); serverArgs.processor(processor);
@ -347,6 +362,19 @@ public class ThriftServer extends Configured implements Tool {
bindAddress = conf.get("hbase.thrift.info.bindAddress"); bindAddress = conf.get("hbase.thrift.info.bindAddress");
} }
// Get read timeout
int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
try {
readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
} catch (NumberFormatException e) {
throw new RuntimeException("Could not parse the value provided for the timeout option", e);
}
} else {
readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
}
// Get port to bind to // Get port to bind to
int listenPort = 0; int listenPort = 0;
try { try {
@ -488,7 +516,8 @@ public class ThriftServer extends Configured implements Tool {
transportFactory, transportFactory,
workerThreads, workerThreads,
inetSocketAddress, inetSocketAddress,
backlog); backlog,
readTimeout);
} }
final TServer tserver = server; final TServer tserver = server;