HBASE-4863 Phabricator D531 Make Thrift server thread pool bounded and add a command-line UI test

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1206267 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-11-25 17:26:40 +00:00
parent 3ba5e7b318
commit 91a19bf59a
8 changed files with 1043 additions and 165 deletions

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.thrift;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A bounded thread pool server customized for HBase.
*/
public class TBoundedThreadPoolServer extends TServer {
private static final String QUEUE_FULL_MSG =
"Queue is full, closing connection";
/**
* The "core size" of the thread pool. New threads are created on every
* connection until this many threads are created.
*/
public static final String MIN_WORKER_THREADS_CONF_KEY =
"hbase.thrift.minWorkerThreads";
/**
* This default core pool size should be enough for many test scenarios. We
* want to override this with a much larger number (e.g. at least 200) for a
* large-scale production setup.
*/
public static final int DEFAULT_MIN_WORKER_THREADS = 16;
/**
* The maximum size of the thread pool. When the pending request queue
* overflows, new threads are created until their number reaches this number.
* After that, the server starts dropping connections.
*/
public static final String MAX_WORKER_THREADS_CONF_KEY =
"hbase.thrift.maxWorkerThreads";
public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
/**
* The maximum number of pending connections waiting in the queue. If there
* are no idle threads in the pool, the server queues requests. Only when
* the queue overflows, new threads are added, up to
* hbase.thrift.maxQueuedRequests threads.
*/
public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
"hbase.thrift.maxQueuedRequests";
public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
/**
* Default amount of time in seconds to keep a thread alive. Worker threads
* are stopped after being idle for this long.
*/
public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY =
"hbase.thrift.threadKeepAliveTimeSec";
private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
/**
* Time to wait after interrupting all worker threads. This is after a clean
* shutdown has been attempted.
*/
public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
private static final Log LOG = LogFactory.getLog(
TBoundedThreadPoolServer.class.getName());
public static class Args extends TThreadPoolServer.Args {
int maxQueuedRequests;
int threadKeepAliveTimeSec;
public Args(TServerTransport transport, Configuration conf) {
super(transport);
minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
DEFAULT_MIN_WORKER_THREADS);
maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
DEFAULT_MAX_WORKER_THREADS);
maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
DEFAULT_MAX_QUEUED_REQUESTS);
threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY,
DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
}
@Override
public String toString() {
return "min worker threads=" + minWorkerThreads
+ ", max worker threads=" + maxWorkerThreads
+ ", max queued requests=" + maxQueuedRequests;
}
}
/** Executor service for handling client connections */
private ExecutorService executorService;
/** Flag for stopping the server */
private volatile boolean stopped;
private Args serverOptions;
public TBoundedThreadPoolServer(Args options) {
super(options);
BlockingQueue<Runnable> executorQueue;
if (options.maxQueuedRequests > 0) {
executorQueue = new LinkedBlockingQueue<Runnable>(
options.maxQueuedRequests);
} else {
executorQueue = new SynchronousQueue<Runnable>();
}
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setDaemon(true);
tfb.setNameFormat("thrift-worker-%d");
executorService =
new ThreadPoolExecutor(options.minWorkerThreads,
options.maxWorkerThreads, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
executorQueue, tfb.build());
serverOptions = options;
}
public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOG.error("Error occurred during listening.", ttx);
return;
}
Runtime.getRuntime().addShutdownHook(
new Thread(getClass().getSimpleName() + "-shutdown-hook") {
@Override
public void run() {
TBoundedThreadPoolServer.this.stop();
}
});
stopped = false;
while (!stopped && !Thread.interrupted()) {
TTransport client = null;
try {
client = serverTransport_.accept();
} catch (TTransportException ttx) {
if (!stopped) {
LOG.warn("Transport error when accepting message", ttx);
continue;
} else {
// The server has been stopped
break;
}
}
ClientConnnection command = new ClientConnnection(client);
try {
executorService.execute(command);
} catch (RejectedExecutionException rex) {
if (client.getClass() == TSocket.class) {
// We expect the client to be TSocket.
LOG.warn(QUEUE_FULL_MSG + " from " +
((TSocket) client).getSocket().getRemoteSocketAddress());
} else {
LOG.warn(QUEUE_FULL_MSG, rex);
}
client.close();
}
}
shutdownServer();
}
/**
* Loop until {@link ExecutorService#awaitTermination} finally does return
* without an interrupted exception. If we don't do this, then we'll shut
* down prematurely. We want to let the executor service clear its task
* queue, closing client sockets appropriately.
*/
private void shutdownServer() {
executorService.shutdown();
long msLeftToWait =
serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
long timeMillis = System.currentTimeMillis();
LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
" pending requests");
boolean interrupted = false;
while (msLeftToWait >= 0) {
try {
executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long timePassed = System.currentTimeMillis() - timeMillis;
msLeftToWait -= timePassed;
timeMillis += timePassed;
interrupted = true;
}
}
LOG.info("Interrupting all worker threads and waiting for "
+ TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
// This will interrupt all the threads, even those running a task.
executorService.shutdownNow();
Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
// Preserve the interrupted status.
if (interrupted) {
Thread.currentThread().interrupt();
}
LOG.info("Thrift server shutdown complete");
}
@Override
public void stop() {
stopped = true;
serverTransport_.interrupt();
}
private class ClientConnnection implements Runnable {
private TTransport client;
/**
* Default constructor.
*
* @param client Transport to process
*/
private ClientConnnection(TTransport client) {
this.client = client;
}
/**
* Loops on processing a client forever
*/
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
try {
processor = processorFactory_.getProcessor(client);
inputTransport = inputTransportFactory_.getTransport(client);
outputTransport = outputTransportFactory_.getTransport(client);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
LOG.error("Thrift error occurred during processing of message.", tx);
} catch (Exception x) {
LOG.error("Error occurred during processing of message.", x);
}
if (inputTransport != null) {
inputTransport.close();
}
if (outputTransport != null) {
outputTransport.close();
}
}
}
}

View File

@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -65,6 +64,8 @@ import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface;
import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor;
import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.Mutation;
@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.thrift.TException; import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TCompactProtocol;
@ -83,7 +85,7 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer; import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TServer.AbstractServerArgs;
import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingServerTransport;
@ -91,12 +93,113 @@ import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory; import org.apache.thrift.transport.TTransportFactory;
import com.google.common.base.Joiner;
/** /**
* ThriftServer - this class starts up a Thrift server which implements the * ThriftServer - this class starts up a Thrift server which implements the
* Hbase API specified in the Hbase.thrift IDL file. * Hbase API specified in the Hbase.thrift IDL file.
*/ */
public class ThriftServer { public class ThriftServer {
private static final Log LOG = LogFactory.getLog(ThriftServer.class);
private static final String MIN_WORKERS_OPTION = "minWorkers";
private static final String MAX_WORKERS_OPTION = "workers";
private static final String MAX_QUEUE_SIZE_OPTION = "queue";
private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
static final String BIND_OPTION = "bind";
static final String COMPACT_OPTION = "compact";
static final String FRAMED_OPTION = "framed";
static final String PORT_OPTION = "port";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
private static final int DEFAULT_LISTEN_PORT = 9090;
private Configuration conf;
TServer server;
/** An enum of server implementation selections */
enum ImplType {
HS_HA("hsha", true, THsHaServer.class, false),
NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true);
public static final ImplType DEFAULT = THREAD_POOL;
final String option;
final boolean isAlwaysFramed;
final Class<? extends TServer> serverClass;
final boolean canSpecifyBindIP;
ImplType(String option, boolean isAlwaysFramed,
Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
this.option = option;
this.isAlwaysFramed = isAlwaysFramed;
this.serverClass = serverClass;
this.canSpecifyBindIP = canSpecifyBindIP;
}
/**
* @return <code>-option</code> so we can get the list of options from
* {@link #values()}
*/
@Override
public String toString() {
return "-" + option;
}
String getDescription() {
StringBuilder sb = new StringBuilder("Use the " +
serverClass.getSimpleName());
if (isAlwaysFramed) {
sb.append(" This implies the framed transport.");
}
if (this == DEFAULT) {
sb.append("This is the default.");
}
return sb.toString();
}
static OptionGroup createOptionGroup() {
OptionGroup group = new OptionGroup();
for (ImplType t : values()) {
group.addOption(new Option(t.option, t.getDescription()));
}
return group;
}
static ImplType getServerImpl(CommandLine cmd) {
ImplType chosenType = null;
int numChosen = 0;
for (ImplType t : values()) {
if (cmd.hasOption(t.option)) {
chosenType = t;
++numChosen;
}
}
if (numChosen != 1) {
throw new AssertionError("Exactly one option out of " +
Arrays.toString(values()) + " has to be specified");
}
return chosenType;
}
public String simpleClassName() {
return serverClass.getSimpleName();
}
public static List<String> serversThatCannotSpecifyBindIP() {
List<String> l = new ArrayList<String>();
for (ImplType t : values()) {
if (!t.canSpecifyBindIP) {
l.add(t.simpleClassName());
}
}
return l;
}
}
/** /**
* The HBaseHandler is a glue object that connects Thrift RPC calls to the * The HBaseHandler is a glue object that connects Thrift RPC calls to the
* HBase client API primarily defined in the HBaseAdmin and HTable objects. * HBase client API primarily defined in the HBaseAdmin and HTable objects.
@ -713,84 +816,85 @@ public class ThriftServer {
@Override @Override
public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
LOG.debug("scannerGetList: id=" + id); LOG.debug("scannerGetList: id=" + id);
ResultScanner scanner = getScanner(id); ResultScanner scanner = getScanner(id);
if (null == scanner) { if (null == scanner) {
throw new IllegalArgument("scanner ID is invalid"); throw new IllegalArgument("scanner ID is invalid");
} }
Result [] results = null; Result [] results = null;
try { try {
results = scanner.next(nbRows); results = scanner.next(nbRows);
if (null == results) { if (null == results) {
return new ArrayList<TRowResult>(); return new ArrayList<TRowResult>();
}
} catch (IOException e) {
throw new IOError(e.getMessage());
} }
return ThriftUtilities.rowResultFromHBase(results); } catch (IOException e) {
throw new IOError(e.getMessage());
}
return ThriftUtilities.rowResultFromHBase(results);
} }
@Override @Override
public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError { public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
return scannerGetList(id,1); return scannerGetList(id,1);
} }
public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError {
try { try {
HTable table = getTable(tableName); HTable table = getTable(tableName);
Scan scan = new Scan(); Scan scan = new Scan();
if (tScan.isSetStartRow()) { if (tScan.isSetStartRow()) {
scan.setStartRow(tScan.getStartRow()); scan.setStartRow(tScan.getStartRow());
} }
if (tScan.isSetStopRow()) { if (tScan.isSetStopRow()) {
scan.setStopRow(tScan.getStopRow()); scan.setStopRow(tScan.getStopRow());
} }
if (tScan.isSetTimestamp()) { if (tScan.isSetTimestamp()) {
scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
} }
if (tScan.isSetCaching()) { if (tScan.isSetCaching()) {
scan.setCaching(tScan.getCaching()); scan.setCaching(tScan.getCaching());
} }
if(tScan.isSetColumns() && tScan.getColumns().size() != 0) { if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
for(ByteBuffer column : tScan.getColumns()) { for(ByteBuffer column : tScan.getColumns()) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column)); byte [][] famQf = KeyValue.parseColumn(getBytes(column));
if(famQf.length == 1) { if(famQf.length == 1) {
scan.addFamily(famQf[0]); scan.addFamily(famQf[0]);
} else { } else {
scan.addColumn(famQf[0], famQf[1]); scan.addColumn(famQf[0], famQf[1]);
}
} }
} }
if (tScan.isSetFilterString()) {
ParseFilter parseFilter = new ParseFilter();
scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
}
return addScanner(table.getScanner(scan));
} catch (IOException e) {
throw new IOError(e.getMessage());
} }
if (tScan.isSetFilterString()) {
ParseFilter parseFilter = new ParseFilter();
scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
}
return addScanner(table.getScanner(scan));
} catch (IOException e) {
throw new IOError(e.getMessage());
}
} }
@Override @Override
public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
List<ByteBuffer> columns) throws IOError { List<ByteBuffer> columns) throws IOError {
try { try {
HTable table = getTable(tableName); HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
if(columns != null && columns.size() != 0) { if(columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) { for(ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column)); byte [][] famQf = KeyValue.parseColumn(getBytes(column));
if(famQf.length == 1) { if(famQf.length == 1) {
scan.addFamily(famQf[0]); scan.addFamily(famQf[0]);
} else { } else {
scan.addColumn(famQf[0], famQf[1]); scan.addColumn(famQf[0], famQf[1]);
}
} }
} }
return addScanner(table.getScanner(scan));
} catch (IOException e) {
throw new IOError(e.getMessage());
} }
return addScanner(table.getScanner(scan));
} catch (IOException e) {
throw new IOError(e.getMessage());
}
} }
@Override @Override
@ -826,7 +930,7 @@ public class ThriftServer {
Filter f = new WhileMatchFilter( Filter f = new WhileMatchFilter(
new PrefixFilter(getBytes(startAndPrefix))); new PrefixFilter(getBytes(startAndPrefix)));
scan.setFilter(f); scan.setFilter(f);
if(columns != null && columns.size() != 0) { if (columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) { for(ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column)); byte [][] famQf = KeyValue.parseColumn(getBytes(column));
if(famQf.length == 1) { if(famQf.length == 1) {
@ -849,8 +953,8 @@ public class ThriftServer {
HTable table = getTable(tableName); HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow)); Scan scan = new Scan(getBytes(startRow));
scan.setTimeRange(Long.MIN_VALUE, timestamp); scan.setTimeRange(Long.MIN_VALUE, timestamp);
if(columns != null && columns.size() != 0) { if (columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) { for (ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column)); byte [][] famQf = KeyValue.parseColumn(getBytes(column));
if(famQf.length == 1) { if(famQf.length == 1) {
scan.addFamily(famQf[0]); scan.addFamily(famQf[0]);
@ -873,8 +977,8 @@ public class ThriftServer {
HTable table = getTable(tableName); HTable table = getTable(tableName);
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
scan.setTimeRange(Long.MIN_VALUE, timestamp); scan.setTimeRange(Long.MIN_VALUE, timestamp);
if(columns != null && columns.size() != 0) { if (columns != null && columns.size() != 0) {
for(ByteBuffer column : columns) { for (ByteBuffer column : columns) {
byte [][] famQf = KeyValue.parseColumn(getBytes(column)); byte [][] famQf = KeyValue.parseColumn(getBytes(column));
if(famQf.length == 1) { if(famQf.length == 1) {
scan.addFamily(famQf[0]); scan.addFamily(famQf[0]);
@ -911,7 +1015,7 @@ public class ThriftServer {
} }
@Override @Override
public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row, public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
ByteBuffer family) throws IOError { ByteBuffer family) throws IOError {
try { try {
HTable table = getTable(getBytes(tableName)); HTable table = getTable(getBytes(tableName));
@ -966,69 +1070,98 @@ public class ThriftServer {
} }
} }
public ThriftServer(Configuration conf) {
this.conf = HBaseConfiguration.create(conf);
}
// //
// Main program and support routines // Main program and support routines
// //
private static void printUsageAndExit(Options options, int exitCode) { private static void printUsageAndExit(Options options, int exitCode)
throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, options, formatter.printHelp("Thrift", null, options,
"To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" + "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
"To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" + "To shutdown the thrift server run 'bin/hbase-daemon.sh stop " +
" send a kill signal to the thrift server pid", "thrift' or send a kill signal to the thrift server pid",
true); true);
System.exit(exitCode); throw new ExitCodeException(exitCode, "");
} }
private static final String DEFAULT_LISTEN_PORT = "9090";
/* /*
* Start up the Thrift server. * Start up or shuts down the Thrift server, depending on the arguments.
* @param args * @param args
*/ */
static private void doMain(final String[] args) throws Exception { void doMain(final String[] args) throws Exception {
Log LOG = LogFactory.getLog("ThriftServer");
Options options = new Options(); Options options = new Options();
options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]"); options.addOption("b", BIND_OPTION, true, "Address to bind " +
options.addOption("p", "port", true, "Port to bind to [default: 9090]"); "the Thrift server to. Not supported by the Nonblocking and " +
options.addOption("f", "framed", false, "Use framed transport"); "HsHa server [default: " + DEFAULT_BIND_ADDR + "]");
options.addOption("c", "compact", false, "Use the compact protocol"); options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
DEFAULT_LISTEN_PORT + "]");
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information"); options.addOption("h", "help", false, "Print help information");
OptionGroup servers = new OptionGroup(); options.addOption("m", MIN_WORKERS_OPTION, true,
servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); "The minimum number of worker threads for " +
servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); ImplType.THREAD_POOL.simpleClassName());
servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers); options.addOption("w", MAX_WORKERS_OPTION, true,
"The maximum number of worker threads for " +
ImplType.THREAD_POOL.simpleClassName());
options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
"The maximum number of queued requests in " +
ImplType.THREAD_POOL.simpleClassName());
options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
"The amount of time in secods to keep a thread alive when idle in " +
ImplType.THREAD_POOL.simpleClassName());
options.addOptionGroup(ImplType.createOptionGroup());
CommandLineParser parser = new PosixParser(); CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args); CommandLine cmd = parser.parse(options, args);
/** // This is so complicated to please both bin/hbase and bin/hbase-daemon.
* This is so complicated to please both bin/hbase and bin/hbase-daemon. // hbase-daemon provides "start" and "stop" arguments
* hbase-daemon provides "start" and "stop" arguments // hbase should print the help if no argument is provided
* hbase should print the help if no argument is provided
*/
List<String> commandLine = Arrays.asList(args); List<String> commandLine = Arrays.asList(args);
boolean stop = commandLine.contains("stop"); boolean stop = commandLine.contains("stop");
boolean start = commandLine.contains("start"); boolean start = commandLine.contains("start");
if (cmd.hasOption("help") || !start || stop) { boolean invalidStartStop = (start && stop) || (!start && !stop);
if (cmd.hasOption("help") || invalidStartStop) {
if (invalidStartStop) {
LOG.error("Exactly one of 'start' and 'stop' has to be specified");
}
printUsageAndExit(options, 1); printUsageAndExit(options, 1);
} }
// Get port to bind to // Get port to bind to
int listenPort = 0; int listenPort = 0;
try { try {
listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT)); listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
String.valueOf(DEFAULT_LISTEN_PORT)));
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e); LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1); printUsageAndExit(options, -1);
} }
// Make optional changes to the configuration based on command-line options
optionToConf(cmd, MIN_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
optionToConf(cmd, MAX_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY);
optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
// Construct correct ProtocolFactory // Construct correct ProtocolFactory
TProtocolFactory protocolFactory; TProtocolFactory protocolFactory;
if (cmd.hasOption("compact")) { if (cmd.hasOption(COMPACT_OPTION)) {
LOG.debug("Using compact protocol"); LOG.debug("Using compact protocol");
protocolFactory = new TCompactProtocol.Factory(); protocolFactory = new TCompactProtocol.Factory();
} else { } else {
@ -1036,78 +1169,125 @@ public class ThriftServer {
protocolFactory = new TBinaryProtocol.Factory(); protocolFactory = new TBinaryProtocol.Factory();
} }
HBaseHandler handler = new HBaseHandler(); HBaseHandler handler = new HBaseHandler(conf);
Hbase.Processor processor = new Hbase.Processor(handler); Hbase.Processor<Hbase.Iface> processor =
new Hbase.Processor<Hbase.Iface>(handler);
ImplType implType = ImplType.getServerImpl(cmd);
TServer server; // Construct correct TransportFactory
if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) { TTransportFactory transportFactory;
if (cmd.hasOption("bind")) { if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) {
LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." + transportFactory = new TFramedTransport.Factory();
" See https://issues.apache.org/jira/browse/HBASE-2155 for details."); LOG.debug("Using framed transport");
printUsageAndExit(options, -1); } else {
transportFactory = new TTransportFactory();
}
if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) {
LOG.error("Server types " + Joiner.on(", ").join(
ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
"address binding at the moment. See " +
"https://issues.apache.org/jira/browse/HBASE-2155 for details.");
printUsageAndExit(options, -1);
}
if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) {
if (cmd.hasOption(BIND_OPTION)) {
throw new RuntimeException("-" + BIND_OPTION + " not supported with " +
implType);
} }
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort); TNonblockingServerTransport serverTransport =
TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(); new TNonblockingServerSocket(listenPort);
if (cmd.hasOption("nonblocking")) { if (implType == ImplType.NONBLOCKING) {
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); TNonblockingServer.Args serverArgs =
serverArgs.processor(processor); new TNonblockingServer.Args(serverTransport);
serverArgs.transportFactory(transportFactory); setServerArgs(serverArgs, processor, transportFactory,
serverArgs.protocolFactory(protocolFactory); protocolFactory);
LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort));
server = new TNonblockingServer(serverArgs); server = new TNonblockingServer(serverArgs);
} else { } else {
THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
serverArgs.processor(processor); serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory); serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory); serverArgs.protocolFactory(protocolFactory);
LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort));
server = new THsHaServer(serverArgs); server = new THsHaServer(serverArgs);
} }
LOG.info("starting HBase " + implType.simpleClassName() +
" server on " + Integer.toString(listenPort));
} else if (implType == ImplType.THREAD_POOL) {
// Thread pool server. Get the IP address to bind to.
InetAddress listenAddress = getBindAddress(options, cmd);
TServerTransport serverTransport = new TServerSocket(
new InetSocketAddress(listenAddress, listenPort));
TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args(
serverTransport, conf);
setServerArgs(serverArgs, processor, transportFactory, protocolFactory);
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ listenAddress + ":" + Integer.toString(listenPort)
+ "; " + serverArgs);
server = new TBoundedThreadPoolServer(serverArgs);
} else { } else {
// Get IP address to bind to throw new AssertionError("Unsupported Thrift server implementation: " +
InetAddress listenAddress = null; implType.simpleClassName());
if (cmd.hasOption("bind")) { }
try {
listenAddress = InetAddress.getByName(cmd.getOptionValue("bind"));
} catch (UnknownHostException e) {
LOG.error("Could not bind to provided ip address", e);
printUsageAndExit(options, -1);
}
} else {
listenAddress = InetAddress.getByName("0.0.0.0");
}
TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort));
// Construct correct TransportFactory // A sanity check that we instantiated the right type of server.
TTransportFactory transportFactory; if (server.getClass() != implType.serverClass) {
if (cmd.hasOption("framed")) { throw new AssertionError("Expected to create Thrift server class " +
transportFactory = new TFramedTransport.Factory(); implType.serverClass.getName() + " but got " +
LOG.debug("Using framed transport"); server.getClass().getName());
} else {
transportFactory = new TTransportFactory();
}
TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
serverArgs.processor(processor);
serverArgs.protocolFactory(protocolFactory);
serverArgs.transportFactory(transportFactory);
LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
server = new TThreadPoolServer(serverArgs);
} }
server.serve(); server.serve();
} }
public void stop() {
server.stop();
}
private InetAddress getBindAddress(Options options, CommandLine cmd)
throws ExitCodeException {
InetAddress listenAddress = null;
String bindAddressStr = cmd.getOptionValue(BIND_OPTION, DEFAULT_BIND_ADDR);
try {
listenAddress = InetAddress.getByName(bindAddressStr);
} catch (UnknownHostException e) {
LOG.error("Could not resolve the bind address specified: " +
bindAddressStr, e);
printUsageAndExit(options, -1);
}
return listenAddress;
}
private static void setServerArgs(AbstractServerArgs<?> serverArgs,
Processor<Iface> processor, TTransportFactory transportFactory,
TProtocolFactory protocolFactory) {
serverArgs.processor(processor);
serverArgs.transportFactory(transportFactory);
serverArgs.protocolFactory(protocolFactory);
}
private static void optionToConf(CommandLine cmd, String option,
Configuration conf, String destConfKey) {
if (cmd.hasOption(option)) {
conf.set(destConfKey, cmd.getOptionValue(option));
}
}
/** /**
* @param args * @param args
* @throws Exception * @throws Exception
*/ */
public static void main(String [] args) throws Exception { public static void main(String [] args) throws Exception {
VersionInfo.logVersion(); VersionInfo.logVersion();
doMain(args); try {
new ThriftServer(HBaseConfiguration.create()).doMain(args);
} catch (ExitCodeException ex) {
System.exit(ex.getExitCode());
}
} }
} }

View File

@ -25,6 +25,7 @@ import java.io.PrintWriter;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.TimeUnit;
/** /**
* Thread Utility * Thread Utility
@ -127,4 +128,28 @@ public class Threads {
e.printStackTrace(); e.printStackTrace();
} }
} }
/**
* Sleeps for the given amount of time even if interrupted. Preserves
* the interrupt status.
* @param msToWait the amount of time to sleep in milliseconds
*/
public static void sleepWithoutInterrupt(final long msToWait) {
long timeMillis = System.currentTimeMillis();
long endTime = timeMillis + msToWait;
boolean interrupted = false;
while (timeMillis < endTime) {
try {
Thread.sleep(endTime - timeMillis);
} catch (InterruptedException ex) {
interrupted = true;
}
timeMillis = System.currentTimeMillis();
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} }

View File

@ -824,4 +824,31 @@
(You will have to restart your cluster after setting it). (You will have to restart your cluster after setting it).
</description> </description>
</property> </property>
<property>
<name>hbase.thrift.minWorkerThreads</name>
<value>16</value>
<description>
The "core size" of the thread pool. New threads are created on every
connection until this many threads are created.
</description>
</property>
<property>
<name>hbase.thrift.maxWorkerThreads</name>
<value>1000</value>
<description>
The maximum size of the thread pool. When the pending request queue
overflows, new threads are created until their number reaches this number.
After that, the server starts dropping connections.
</description>
</property>
<property>
<name>hbase.thrift.maxQueuedRequests</name>
<value>1000</value>
<description>
The maximum number of pending Thrift connections waiting in the queue. If
there are no idle threads in the pool, the server queues requests. Only
when the queue overflows, new threads are added, up to
hbase.thrift.maxQueuedRequests threads.
</description>
</property>
</configuration> </configuration>

View File

@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.net.ServerSocket;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -1714,4 +1715,31 @@ public class HBaseTestingUtility {
return table; return table;
} }
private static final int MIN_RANDOM_PORT = 0xc000;
private static final int MAX_RANDOM_PORT = 0xfffe;
/**
* Returns a random port. These ports cannot be registered with IANA and are
* intended for dynamic allocation (see http://bit.ly/dynports).
*/
public static int randomPort() {
return MIN_RANDOM_PORT
+ new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT);
}
public static int randomFreePort() {
int port = 0;
do {
port = randomPort();
try {
ServerSocket sock = new ServerSocket(port);
sock.close();
} catch (IOException ex) {
port = 0;
}
} while (port == 0);
return port;
}
} }

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.thrift;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.Mutation;
import org.apache.hadoop.hbase.thrift.generated.TCell; import org.apache.hadoop.hbase.thrift.generated.TCell;
import org.apache.hadoop.hbase.thrift.generated.TRowResult; import org.apache.hadoop.hbase.thrift.generated.TRowResult;
@ -47,20 +47,22 @@ import org.junit.experimental.categories.Category;
public class TestThriftServer { public class TestThriftServer {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected static final int MAXVERSIONS = 3; protected static final int MAXVERSIONS = 3;
private static ByteBuffer $bb(String i) {
private static ByteBuffer asByteBuffer(String i) {
return ByteBuffer.wrap(Bytes.toBytes(i)); return ByteBuffer.wrap(Bytes.toBytes(i));
} }
// Static names for tables, columns, rows, and values // Static names for tables, columns, rows, and values
private static ByteBuffer tableAname = $bb("tableA"); private static ByteBuffer tableAname = asByteBuffer("tableA");
private static ByteBuffer tableBname = $bb("tableB"); private static ByteBuffer tableBname = asByteBuffer("tableB");
private static ByteBuffer columnAname = $bb("columnA:"); private static ByteBuffer columnAname = asByteBuffer("columnA:");
private static ByteBuffer columnBname = $bb("columnB:"); private static ByteBuffer columnBname = asByteBuffer("columnB:");
private static ByteBuffer rowAname = $bb("rowA"); private static ByteBuffer rowAname = asByteBuffer("rowA");
private static ByteBuffer rowBname = $bb("rowB"); private static ByteBuffer rowBname = asByteBuffer("rowB");
private static ByteBuffer valueAname = $bb("valueA"); private static ByteBuffer valueAname = asByteBuffer("valueA");
private static ByteBuffer valueBname = $bb("valueB"); private static ByteBuffer valueBname = asByteBuffer("valueB");
private static ByteBuffer valueCname = $bb("valueC"); private static ByteBuffer valueCname = asByteBuffer("valueC");
private static ByteBuffer valueDname = $bb("valueD"); private static ByteBuffer valueDname = asByteBuffer("valueD");
@BeforeClass @BeforeClass
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
@ -100,7 +102,11 @@ public class TestThriftServer {
public void doTestTableCreateDrop() throws Exception { public void doTestTableCreateDrop() throws Exception {
ThriftServer.HBaseHandler handler = ThriftServer.HBaseHandler handler =
new ThriftServer.HBaseHandler(UTIL.getConfiguration()); new ThriftServer.HBaseHandler(UTIL.getConfiguration());
createTestTables(handler);
dropTestTables(handler);
}
public static void createTestTables(Hbase.Iface handler) throws Exception {
// Create/enable/disable/delete tables, ensure methods act correctly // Create/enable/disable/delete tables, ensure methods act correctly
assertEquals(handler.getTableNames().size(), 0); assertEquals(handler.getTableNames().size(), 0);
handler.createTable(tableAname, getColumnDescriptors()); handler.createTable(tableAname, getColumnDescriptors());
@ -109,6 +115,9 @@ public class TestThriftServer {
assertTrue(handler.isTableEnabled(tableAname)); assertTrue(handler.isTableEnabled(tableAname));
handler.createTable(tableBname, new ArrayList<ColumnDescriptor>()); handler.createTable(tableBname, new ArrayList<ColumnDescriptor>());
assertEquals(handler.getTableNames().size(), 2); assertEquals(handler.getTableNames().size(), 2);
}
public static void dropTestTables(Hbase.Iface handler) throws Exception {
handler.disableTable(tableBname); handler.disableTable(tableBname);
assertFalse(handler.isTableEnabled(tableBname)); assertFalse(handler.isTableEnabled(tableBname));
handler.deleteTable(tableBname); handler.deleteTable(tableBname);
@ -121,7 +130,7 @@ public class TestThriftServer {
handler.disableTable(tableAname);*/ handler.disableTable(tableAname);*/
handler.deleteTable(tableAname); handler.deleteTable(tableAname);
} }
/** /**
* Tests adding a series of Mutations and BatchMutations, including a * Tests adding a series of Mutations and BatchMutations, including a
* delete mutation. Also tests data retrieval, and getting back multiple * delete mutation. Also tests data retrieval, and getting back multiple
@ -343,7 +352,7 @@ public class TestThriftServer {
handler.disableTable(tableAname); handler.disableTable(tableAname);
handler.deleteTable(tableAname); handler.deleteTable(tableAname);
} }
/** /**
* For HBASE-2556 * For HBASE-2556
* Tests for GetTableRegions * Tests for GetTableRegions
@ -357,19 +366,19 @@ public class TestThriftServer {
int regionCount = handler.getTableRegions(tableAname).size(); int regionCount = handler.getTableRegions(tableAname).size();
assertEquals("empty table should have only 1 region, " + assertEquals("empty table should have only 1 region, " +
"but found " + regionCount, regionCount, 1); "but found " + regionCount, regionCount, 1);
handler.disableTable(tableAname); handler.disableTable(tableAname);
handler.deleteTable(tableAname); handler.deleteTable(tableAname);
regionCount = handler.getTableRegions(tableAname).size(); regionCount = handler.getTableRegions(tableAname).size();
assertEquals("non-existing table should have 0 region, " + assertEquals("non-existing table should have 0 region, " +
"but found " + regionCount, regionCount, 0); "but found " + regionCount, regionCount, 0);
} }
/** /**
* *
* @return a List of ColumnDescriptors for use in creating a table. Has one * @return a List of ColumnDescriptors for use in creating a table. Has one
* default ColumnDescriptor and one ColumnDescriptor with fewer versions * default ColumnDescriptor and one ColumnDescriptor with fewer versions
*/ */
private List<ColumnDescriptor> getColumnDescriptors() { private static List<ColumnDescriptor> getColumnDescriptors() {
ArrayList<ColumnDescriptor> cDescriptors = new ArrayList<ColumnDescriptor>(); ArrayList<ColumnDescriptor> cDescriptors = new ArrayList<ColumnDescriptor>();
// A default ColumnDescriptor // A default ColumnDescriptor

View File

@ -0,0 +1,230 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.thrift;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Joiner;
/**
* Start the HBase Thrift server on a random port through the command-line
* interface and talk to it from client side.
*/
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestThriftServerCmdLine {
public static final Log LOG =
LogFactory.getLog(TestThriftServerCmdLine.class);
private final ImplType implType;
private boolean specifyFramed;
private boolean specifyBindIP;
private boolean specifyCompact;
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private Thread cmdLineThread;
private volatile Exception cmdLineException;
private Exception clientSideException;
private ThriftServer thriftServer;
private int port;
@Parameters
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) {
for (boolean specifyFramed : new boolean[] {false, true}) {
for (boolean specifyBindIP : new boolean[] {false, true}) {
if (specifyBindIP && !implType.canSpecifyBindIP) {
continue;
}
for (boolean specifyCompact : new boolean[] {false, true}) {
parameters.add(new Object[]{implType, new Boolean(specifyFramed),
new Boolean(specifyBindIP), new Boolean(specifyCompact)});
}
}
}
}
return parameters;
}
public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed,
boolean specifyBindIP, boolean specifyCompact) {
this.implType = implType;
this.specifyFramed = specifyFramed;
this.specifyBindIP = specifyBindIP;
this.specifyCompact = specifyCompact;
LOG.debug("implType=" + implType + ", " +
"specifyFramed=" + specifyFramed + ", " +
"specifyBindIP=" + specifyBindIP + ", " +
"specifyCompact=" + specifyCompact);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private void startCmdLineThread(final String[] args) {
LOG.info("Starting HBase Thrift server with command line: " +
Joiner.on(" ").join(args));
cmdLineException = null;
cmdLineThread = new Thread(new Runnable() {
@Override
public void run() {
try {
thriftServer.doMain(args);
} catch (Exception e) {
cmdLineException = e;
}
}
});
cmdLineThread.setName(ThriftServer.class.getSimpleName() +
"-cmdline");
cmdLineThread.start();
}
@Test(timeout=30 * 1000)
public void testRunThriftServer() throws Exception {
List<String> args = new ArrayList<String>();
if (implType != null) {
String serverTypeOption = implType.toString();
assertTrue(serverTypeOption.startsWith("-"));
args.add(serverTypeOption);
}
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
if (specifyFramed) {
args.add("-" + ThriftServer.FRAMED_OPTION);
}
if (specifyBindIP) {
args.add("-" + ThriftServer.BIND_OPTION);
args.add(InetAddress.getLocalHost().getHostName());
}
if (specifyCompact) {
args.add("-" + ThriftServer.COMPACT_OPTION);
}
args.add("start");
thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());
startCmdLineThread(args.toArray(new String[0]));
Threads.sleepWithoutInterrupt(2000);
try {
talkToThriftServer();
} catch (Exception ex) {
clientSideException = ex;
} finally {
stopCmdLineThread();
}
Class<? extends TServer> expectedClass;
if (implType != null) {
expectedClass = implType.serverClass;
} else {
expectedClass = TBoundedThreadPoolServer.class;
}
assertEquals(expectedClass, thriftServer.server.getClass());
if (clientSideException != null) {
LOG.error("Thrift client threw an exception", clientSideException);
throw new Exception(clientSideException);
}
}
private void talkToThriftServer() throws Exception {
TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(),
port);
TTransport transport = sock;
if (specifyFramed || implType.isAlwaysFramed) {
transport = new TFramedTransport(transport);
}
sock.open();
TProtocol prot;
if (specifyCompact) {
prot = new TCompactProtocol(transport);
} else {
prot = new TBinaryProtocol(transport);
}
Hbase.Client client = new Hbase.Client(prot);
List<ByteBuffer> tableNames = client.getTableNames();
if (tableNames.isEmpty()) {
TestThriftServer.createTestTables(client);
assertEquals(2, client.getTableNames().size());
} else {
assertEquals(2, tableNames.size());
assertEquals(2, client.getColumnDescriptors(tableNames.get(0)).size());
}
sock.close();
}
private void stopCmdLineThread() throws Exception {
LOG.debug("Stopping " + implType.simpleClassName() + " Thrift server");
thriftServer.stop();
cmdLineThread.join();
if (cmdLineException != null) {
LOG.error("Command-line invocation of HBase Thrift server threw an " +
"exception", cmdLineException);
throw new Exception(cmdLineException);
}
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
public class TestThreads {
private static final Log LOG = LogFactory.getLog(TestThreads.class);
private static final int SLEEP_TIME_MS = 5000;
private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS);
private volatile boolean wasInterrupted;
@Test(timeout=6000)
public void testSleepWithoutInterrupt() throws InterruptedException {
Thread sleeper = new Thread(new Runnable() {
@Override
public void run() {
LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS);
Threads.sleepWithoutInterrupt(SLEEP_TIME_MS);
LOG.debug("Sleeper thread: finished sleeping");
wasInterrupted = Thread.currentThread().isInterrupted();
}
});
LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)");
sleeper.start();
long startTime = System.currentTimeMillis();
LOG.debug("Main thread: sleeping for 500 ms");
Threads.sleep(500);
LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms");
sleeper.interrupt();
Threads.sleep(2000);
LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms");
sleeper.interrupt();
Threads.sleep(1000);
LOG.debug("Interrupting the sleeper thread again");
sleeper.interrupt();
sleeper.join();
assertTrue("sleepWithoutInterrupt did not preserve the thread's " +
"interrupted status", wasInterrupted);
long timeElapsed = System.currentTimeMillis() - startTime;
assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " +
"range of the sleep time " + SLEEP_TIME_MS,
Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS);
LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " +
timeElapsed);
}
}