diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java new file mode 100644 index 00000000000..690a57f8b9f --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A bounded thread pool server customized for HBase. + */ +public class TBoundedThreadPoolServer extends TServer { + + private static final String QUEUE_FULL_MSG = + "Queue is full, closing connection"; + + /** + * The "core size" of the thread pool. New threads are created on every + * connection until this many threads are created. + */ + public static final String MIN_WORKER_THREADS_CONF_KEY = + "hbase.thrift.minWorkerThreads"; + + /** + * This default core pool size should be enough for many test scenarios. We + * want to override this with a much larger number (e.g. at least 200) for a + * large-scale production setup. + */ + public static final int DEFAULT_MIN_WORKER_THREADS = 16; + + /** + * The maximum size of the thread pool. When the pending request queue + * overflows, new threads are created until their number reaches this number. + * After that, the server starts dropping connections. + */ + public static final String MAX_WORKER_THREADS_CONF_KEY = + "hbase.thrift.maxWorkerThreads"; + + public static final int DEFAULT_MAX_WORKER_THREADS = 1000; + + /** + * The maximum number of pending connections waiting in the queue. If there + * are no idle threads in the pool, the server queues requests. Only when + * the queue overflows, new threads are added, up to + * hbase.thrift.maxQueuedRequests threads. + */ + public static final String MAX_QUEUED_REQUESTS_CONF_KEY = + "hbase.thrift.maxQueuedRequests"; + + public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000; + + /** + * Default amount of time in seconds to keep a thread alive. Worker threads + * are stopped after being idle for this long. + */ + public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY = + "hbase.thrift.threadKeepAliveTimeSec"; + + private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60; + + /** + * Time to wait after interrupting all worker threads. This is after a clean + * shutdown has been attempted. + */ + public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000; + + private static final Log LOG = LogFactory.getLog( + TBoundedThreadPoolServer.class.getName()); + + public static class Args extends TThreadPoolServer.Args { + int maxQueuedRequests; + int threadKeepAliveTimeSec; + + public Args(TServerTransport transport, Configuration conf) { + super(transport); + minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY, + DEFAULT_MIN_WORKER_THREADS); + maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY, + DEFAULT_MAX_WORKER_THREADS); + maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY, + DEFAULT_MAX_QUEUED_REQUESTS); + threadKeepAliveTimeSec = conf.getInt(THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, + DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC); + } + + @Override + public String toString() { + return "min worker threads=" + minWorkerThreads + + ", max worker threads=" + maxWorkerThreads + + ", max queued requests=" + maxQueuedRequests; + } + } + + /** Executor service for handling client connections */ + private ExecutorService executorService; + + /** Flag for stopping the server */ + private volatile boolean stopped; + + private Args serverOptions; + + public TBoundedThreadPoolServer(Args options) { + super(options); + + BlockingQueue executorQueue; + if (options.maxQueuedRequests > 0) { + executorQueue = new LinkedBlockingQueue( + options.maxQueuedRequests); + } else { + executorQueue = new SynchronousQueue(); + } + + ThreadFactoryBuilder tfb = new ThreadFactoryBuilder(); + tfb.setDaemon(true); + tfb.setNameFormat("thrift-worker-%d"); + executorService = + new ThreadPoolExecutor(options.minWorkerThreads, + options.maxWorkerThreads, DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS, + executorQueue, tfb.build()); + serverOptions = options; + } + + public void serve() { + try { + serverTransport_.listen(); + } catch (TTransportException ttx) { + LOG.error("Error occurred during listening.", ttx); + return; + } + + Runtime.getRuntime().addShutdownHook( + new Thread(getClass().getSimpleName() + "-shutdown-hook") { + @Override + public void run() { + TBoundedThreadPoolServer.this.stop(); + } + }); + + stopped = false; + while (!stopped && !Thread.interrupted()) { + TTransport client = null; + try { + client = serverTransport_.accept(); + } catch (TTransportException ttx) { + if (!stopped) { + LOG.warn("Transport error when accepting message", ttx); + continue; + } else { + // The server has been stopped + break; + } + } + + ClientConnnection command = new ClientConnnection(client); + try { + executorService.execute(command); + } catch (RejectedExecutionException rex) { + if (client.getClass() == TSocket.class) { + // We expect the client to be TSocket. + LOG.warn(QUEUE_FULL_MSG + " from " + + ((TSocket) client).getSocket().getRemoteSocketAddress()); + } else { + LOG.warn(QUEUE_FULL_MSG, rex); + } + client.close(); + } + } + + shutdownServer(); + } + + /** + * Loop until {@link ExecutorService#awaitTermination} finally does return + * without an interrupted exception. If we don't do this, then we'll shut + * down prematurely. We want to let the executor service clear its task + * queue, closing client sockets appropriately. + */ + private void shutdownServer() { + executorService.shutdown(); + + long msLeftToWait = + serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal); + long timeMillis = System.currentTimeMillis(); + + LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" + + " pending requests"); + boolean interrupted = false; + while (msLeftToWait >= 0) { + try { + executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long timePassed = System.currentTimeMillis() - timeMillis; + msLeftToWait -= timePassed; + timeMillis += timePassed; + interrupted = true; + } + } + + LOG.info("Interrupting all worker threads and waiting for " + + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer"); + + // This will interrupt all the threads, even those running a task. + executorService.shutdownNow(); + Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS); + + // Preserve the interrupted status. + if (interrupted) { + Thread.currentThread().interrupt(); + } + LOG.info("Thrift server shutdown complete"); + } + + @Override + public void stop() { + stopped = true; + serverTransport_.interrupt(); + } + + private class ClientConnnection implements Runnable { + + private TTransport client; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private ClientConnnection(TTransport client) { + this.client = client; + } + + /** + * Loops on processing a client forever + */ + public void run() { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + try { + processor = processorFactory_.getProcessor(client); + inputTransport = inputTransportFactory_.getTransport(client); + outputTransport = outputTransportFactory_.getTransport(client); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + // we check stopped_ first to make sure we're not supposed to be shutting + // down. this is necessary for graceful shutdown. + while (!stopped && processor.process(inputProtocol, outputProtocol)) {} + } catch (TTransportException ttx) { + // Assume the client died and continue silently + } catch (TException tx) { + LOG.error("Thrift error occurred during processing of message.", tx); + } catch (Exception x) { + LOG.error("Error occurred during processing of message.", x); + } + + if (inputTransport != null) { + inputTransport.close(); + } + + if (outputTransport != null) { + outputTransport.close(); + } + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index beffca7977f..7d79094ca4f 100644 --- a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; @@ -65,6 +64,8 @@ import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor; import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; import org.apache.hadoop.hbase.thrift.generated.Mutation; @@ -76,6 +77,7 @@ import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; @@ -83,7 +85,7 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.server.TServer.AbstractServerArgs; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -91,12 +93,113 @@ import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportFactory; +import com.google.common.base.Joiner; + /** * ThriftServer - this class starts up a Thrift server which implements the * Hbase API specified in the Hbase.thrift IDL file. */ public class ThriftServer { + private static final Log LOG = LogFactory.getLog(ThriftServer.class); + + private static final String MIN_WORKERS_OPTION = "minWorkers"; + private static final String MAX_WORKERS_OPTION = "workers"; + private static final String MAX_QUEUE_SIZE_OPTION = "queue"; + private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec"; + static final String BIND_OPTION = "bind"; + static final String COMPACT_OPTION = "compact"; + static final String FRAMED_OPTION = "framed"; + static final String PORT_OPTION = "port"; + + private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; + private static final int DEFAULT_LISTEN_PORT = 9090; + + private Configuration conf; + TServer server; + + /** An enum of server implementation selections */ + enum ImplType { + HS_HA("hsha", true, THsHaServer.class, false), + NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), + THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true); + + public static final ImplType DEFAULT = THREAD_POOL; + + final String option; + final boolean isAlwaysFramed; + final Class serverClass; + final boolean canSpecifyBindIP; + + ImplType(String option, boolean isAlwaysFramed, + Class serverClass, boolean canSpecifyBindIP) { + this.option = option; + this.isAlwaysFramed = isAlwaysFramed; + this.serverClass = serverClass; + this.canSpecifyBindIP = canSpecifyBindIP; + } + + /** + * @return -option so we can get the list of options from + * {@link #values()} + */ + @Override + public String toString() { + return "-" + option; + } + + String getDescription() { + StringBuilder sb = new StringBuilder("Use the " + + serverClass.getSimpleName()); + if (isAlwaysFramed) { + sb.append(" This implies the framed transport."); + } + if (this == DEFAULT) { + sb.append("This is the default."); + } + return sb.toString(); + } + + static OptionGroup createOptionGroup() { + OptionGroup group = new OptionGroup(); + for (ImplType t : values()) { + group.addOption(new Option(t.option, t.getDescription())); + } + return group; + } + + static ImplType getServerImpl(CommandLine cmd) { + ImplType chosenType = null; + int numChosen = 0; + for (ImplType t : values()) { + if (cmd.hasOption(t.option)) { + chosenType = t; + ++numChosen; + } + } + if (numChosen != 1) { + throw new AssertionError("Exactly one option out of " + + Arrays.toString(values()) + " has to be specified"); + } + return chosenType; + } + + public String simpleClassName() { + return serverClass.getSimpleName(); + } + + public static List serversThatCannotSpecifyBindIP() { + List l = new ArrayList(); + for (ImplType t : values()) { + if (!t.canSpecifyBindIP) { + l.add(t.simpleClassName()); + } + } + return l; + } + + } + /** * The HBaseHandler is a glue object that connects Thrift RPC calls to the * HBase client API primarily defined in the HBaseAdmin and HTable objects. @@ -713,84 +816,85 @@ public class ThriftServer { @Override public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { - LOG.debug("scannerGetList: id=" + id); - ResultScanner scanner = getScanner(id); - if (null == scanner) { - throw new IllegalArgument("scanner ID is invalid"); - } + LOG.debug("scannerGetList: id=" + id); + ResultScanner scanner = getScanner(id); + if (null == scanner) { + throw new IllegalArgument("scanner ID is invalid"); + } - Result [] results = null; - try { - results = scanner.next(nbRows); - if (null == results) { - return new ArrayList(); - } - } catch (IOException e) { - throw new IOError(e.getMessage()); + Result [] results = null; + try { + results = scanner.next(nbRows); + if (null == results) { + return new ArrayList(); } - return ThriftUtilities.rowResultFromHBase(results); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + return ThriftUtilities.rowResultFromHBase(results); } + @Override public List scannerGet(int id) throws IllegalArgument, IOError { - return scannerGetList(id,1); + return scannerGetList(id,1); } public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(); - if (tScan.isSetStartRow()) { - scan.setStartRow(tScan.getStartRow()); - } - if (tScan.isSetStopRow()) { - scan.setStopRow(tScan.getStopRow()); - } - if (tScan.isSetTimestamp()) { - scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); - } - if (tScan.isSetCaching()) { - scan.setCaching(tScan.getCaching()); - } - if(tScan.isSetColumns() && tScan.getColumns().size() != 0) { - for(ByteBuffer column : tScan.getColumns()) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } + try { + HTable table = getTable(tableName); + Scan scan = new Scan(); + if (tScan.isSetStartRow()) { + scan.setStartRow(tScan.getStartRow()); + } + if (tScan.isSetStopRow()) { + scan.setStopRow(tScan.getStopRow()); + } + if (tScan.isSetTimestamp()) { + scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); + } + if (tScan.isSetCaching()) { + scan.setCaching(tScan.getCaching()); + } + if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { + for(ByteBuffer column : tScan.getColumns()) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); } } - if (tScan.isSetFilterString()) { - ParseFilter parseFilter = new ParseFilter(); - scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); } + if (tScan.isSetFilterString()) { + ParseFilter parseFilter = new ParseFilter(); + scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } } @Override public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, - List columns) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow)); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } + List columns) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); } } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } } @Override @@ -826,7 +930,7 @@ public class ThriftServer { Filter f = new WhileMatchFilter( new PrefixFilter(getBytes(startAndPrefix))); scan.setFilter(f); - if(columns != null && columns.size() != 0) { + if (columns != null && columns.size() != 0) { for(ByteBuffer column : columns) { byte [][] famQf = KeyValue.parseColumn(getBytes(column)); if(famQf.length == 1) { @@ -849,8 +953,8 @@ public class ThriftServer { HTable table = getTable(tableName); Scan scan = new Scan(getBytes(startRow)); scan.setTimeRange(Long.MIN_VALUE, timestamp); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { byte [][] famQf = KeyValue.parseColumn(getBytes(column)); if(famQf.length == 1) { scan.addFamily(famQf[0]); @@ -873,8 +977,8 @@ public class ThriftServer { HTable table = getTable(tableName); Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); scan.setTimeRange(Long.MIN_VALUE, timestamp); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { byte [][] famQf = KeyValue.parseColumn(getBytes(column)); if(famQf.length == 1) { scan.addFamily(famQf[0]); @@ -911,7 +1015,7 @@ public class ThriftServer { } @Override - public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, + public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, ByteBuffer family) throws IOError { try { HTable table = getTable(getBytes(tableName)); @@ -966,69 +1070,98 @@ public class ThriftServer { } } + public ThriftServer(Configuration conf) { + this.conf = HBaseConfiguration.create(conf); + } + // // Main program and support routines // - private static void printUsageAndExit(Options options, int exitCode) { + private static void printUsageAndExit(Options options, int exitCode) + throws ExitCodeException { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("Thrift", null, options, - "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" + - "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" + - " send a kill signal to the thrift server pid", - true); - System.exit(exitCode); + "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" + + "To shutdown the thrift server run 'bin/hbase-daemon.sh stop " + + "thrift' or send a kill signal to the thrift server pid", + true); + throw new ExitCodeException(exitCode, ""); } - private static final String DEFAULT_LISTEN_PORT = "9090"; - /* - * Start up the Thrift server. + * Start up or shuts down the Thrift server, depending on the arguments. * @param args */ - static private void doMain(final String[] args) throws Exception { - Log LOG = LogFactory.getLog("ThriftServer"); - + void doMain(final String[] args) throws Exception { Options options = new Options(); - options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]"); - options.addOption("p", "port", true, "Port to bind to [default: 9090]"); - options.addOption("f", "framed", false, "Use framed transport"); - options.addOption("c", "compact", false, "Use the compact protocol"); + options.addOption("b", BIND_OPTION, true, "Address to bind " + + "the Thrift server to. Not supported by the Nonblocking and " + + "HsHa server [default: " + DEFAULT_BIND_ADDR + "]"); + options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " + + DEFAULT_LISTEN_PORT + "]"); + options.addOption("f", FRAMED_OPTION, false, "Use framed transport"); + options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol"); options.addOption("h", "help", false, "Print help information"); - OptionGroup servers = new OptionGroup(); - servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport.")); - servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport.")); - servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default.")); - options.addOptionGroup(servers); + options.addOption("m", MIN_WORKERS_OPTION, true, + "The minimum number of worker threads for " + + ImplType.THREAD_POOL.simpleClassName()); + + options.addOption("w", MAX_WORKERS_OPTION, true, + "The maximum number of worker threads for " + + ImplType.THREAD_POOL.simpleClassName()); + + options.addOption("q", MAX_QUEUE_SIZE_OPTION, true, + "The maximum number of queued requests in " + + ImplType.THREAD_POOL.simpleClassName()); + + options.addOption("k", KEEP_ALIVE_SEC_OPTION, true, + "The amount of time in secods to keep a thread alive when idle in " + + ImplType.THREAD_POOL.simpleClassName()); + + options.addOptionGroup(ImplType.createOptionGroup()); CommandLineParser parser = new PosixParser(); CommandLine cmd = parser.parse(options, args); - /** - * This is so complicated to please both bin/hbase and bin/hbase-daemon. - * hbase-daemon provides "start" and "stop" arguments - * hbase should print the help if no argument is provided - */ + // This is so complicated to please both bin/hbase and bin/hbase-daemon. + // hbase-daemon provides "start" and "stop" arguments + // hbase should print the help if no argument is provided List commandLine = Arrays.asList(args); boolean stop = commandLine.contains("stop"); boolean start = commandLine.contains("start"); - if (cmd.hasOption("help") || !start || stop) { + boolean invalidStartStop = (start && stop) || (!start && !stop); + if (cmd.hasOption("help") || invalidStartStop) { + if (invalidStartStop) { + LOG.error("Exactly one of 'start' and 'stop' has to be specified"); + } printUsageAndExit(options, 1); } // Get port to bind to int listenPort = 0; try { - listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT)); + listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, + String.valueOf(DEFAULT_LISTEN_PORT))); } catch (NumberFormatException e) { LOG.error("Could not parse the value provided for the port option", e); printUsageAndExit(options, -1); } + // Make optional changes to the configuration based on command-line options + optionToConf(cmd, MIN_WORKERS_OPTION, + conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY); + optionToConf(cmd, MAX_WORKERS_OPTION, + conf, TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY); + optionToConf(cmd, MAX_QUEUE_SIZE_OPTION, + conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY); + optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, + conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); + // Construct correct ProtocolFactory TProtocolFactory protocolFactory; - if (cmd.hasOption("compact")) { + if (cmd.hasOption(COMPACT_OPTION)) { LOG.debug("Using compact protocol"); protocolFactory = new TCompactProtocol.Factory(); } else { @@ -1036,78 +1169,125 @@ public class ThriftServer { protocolFactory = new TBinaryProtocol.Factory(); } - HBaseHandler handler = new HBaseHandler(); - Hbase.Processor processor = new Hbase.Processor(handler); + HBaseHandler handler = new HBaseHandler(conf); + Hbase.Processor processor = + new Hbase.Processor(handler); + ImplType implType = ImplType.getServerImpl(cmd); - TServer server; - if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) { - if (cmd.hasOption("bind")) { - LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." + - " See https://issues.apache.org/jira/browse/HBASE-2155 for details."); - printUsageAndExit(options, -1); + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) { + LOG.error("Server types " + Joiner.on(", ").join( + ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + + "address binding at the moment. See " + + "https://issues.apache.org/jira/browse/HBASE-2155 for details."); + printUsageAndExit(options, -1); + } + + if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) { + if (cmd.hasOption(BIND_OPTION)) { + throw new RuntimeException("-" + BIND_OPTION + " not supported with " + + implType); } - TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort); - TFramedTransport.Factory transportFactory = new TFramedTransport.Factory(); + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(listenPort); - if (cmd.hasOption("nonblocking")) { - TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); - - LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort)); + if (implType == ImplType.NONBLOCKING) { + TNonblockingServer.Args serverArgs = + new TNonblockingServer.Args(serverTransport); + setServerArgs(serverArgs, processor, transportFactory, + protocolFactory); server = new TNonblockingServer(serverArgs); } else { THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); serverArgs.processor(processor); serverArgs.transportFactory(transportFactory); serverArgs.protocolFactory(protocolFactory); - - LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort)); server = new THsHaServer(serverArgs); } + LOG.info("starting HBase " + implType.simpleClassName() + + " server on " + Integer.toString(listenPort)); + } else if (implType == ImplType.THREAD_POOL) { + // Thread pool server. Get the IP address to bind to. + InetAddress listenAddress = getBindAddress(options, cmd); + + TServerTransport serverTransport = new TServerSocket( + new InetSocketAddress(listenAddress, listenPort)); + + TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args( + serverTransport, conf); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " + + listenAddress + ":" + Integer.toString(listenPort) + + "; " + serverArgs); + server = new TBoundedThreadPoolServer(serverArgs); } else { - // Get IP address to bind to - InetAddress listenAddress = null; - if (cmd.hasOption("bind")) { - try { - listenAddress = InetAddress.getByName(cmd.getOptionValue("bind")); - } catch (UnknownHostException e) { - LOG.error("Could not bind to provided ip address", e); - printUsageAndExit(options, -1); - } - } else { - listenAddress = InetAddress.getByName("0.0.0.0"); - } - TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort)); + throw new AssertionError("Unsupported Thrift server implementation: " + + implType.simpleClassName()); + } - // Construct correct TransportFactory - TTransportFactory transportFactory; - if (cmd.hasOption("framed")) { - transportFactory = new TFramedTransport.Factory(); - LOG.debug("Using framed transport"); - } else { - transportFactory = new TTransportFactory(); - } - - TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.protocolFactory(protocolFactory); - serverArgs.transportFactory(transportFactory); - LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort)); - server = new TThreadPoolServer(serverArgs); + // A sanity check that we instantiated the right type of server. + if (server.getClass() != implType.serverClass) { + throw new AssertionError("Expected to create Thrift server class " + + implType.serverClass.getName() + " but got " + + server.getClass().getName()); } server.serve(); } + public void stop() { + server.stop(); + } + + private InetAddress getBindAddress(Options options, CommandLine cmd) + throws ExitCodeException { + InetAddress listenAddress = null; + String bindAddressStr = cmd.getOptionValue(BIND_OPTION, DEFAULT_BIND_ADDR); + try { + listenAddress = InetAddress.getByName(bindAddressStr); + } catch (UnknownHostException e) { + LOG.error("Could not resolve the bind address specified: " + + bindAddressStr, e); + printUsageAndExit(options, -1); + } + return listenAddress; + } + + private static void setServerArgs(AbstractServerArgs serverArgs, + Processor processor, TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + } + + private static void optionToConf(CommandLine cmd, String option, + Configuration conf, String destConfKey) { + if (cmd.hasOption(option)) { + conf.set(destConfKey, cmd.getOptionValue(option)); + } + } + /** * @param args * @throws Exception */ public static void main(String [] args) throws Exception { - VersionInfo.logVersion(); - doMain(args); + VersionInfo.logVersion(); + try { + new ThriftServer(HBaseConfiguration.create()).doMain(args); + } catch (ExitCodeException ex) { + System.exit(ex.getExitCode()); + } } + } diff --git a/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 0819e6c17a8..6f81b6259ff 100644 --- a/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -25,6 +25,7 @@ import java.io.PrintWriter; import org.apache.hadoop.util.ReflectionUtils; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.TimeUnit; /** * Thread Utility @@ -127,4 +128,28 @@ public class Threads { e.printStackTrace(); } } + + /** + * Sleeps for the given amount of time even if interrupted. Preserves + * the interrupt status. + * @param msToWait the amount of time to sleep in milliseconds + */ + public static void sleepWithoutInterrupt(final long msToWait) { + long timeMillis = System.currentTimeMillis(); + long endTime = timeMillis + msToWait; + boolean interrupted = false; + while (timeMillis < endTime) { + try { + Thread.sleep(endTime - timeMillis); + } catch (InterruptedException ex) { + interrupted = true; + } + timeMillis = System.currentTimeMillis(); + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index add8b9ed1c6..7059c60eecb 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -824,4 +824,31 @@ (You will have to restart your cluster after setting it). + + hbase.thrift.minWorkerThreads + 16 + + The "core size" of the thread pool. New threads are created on every + connection until this many threads are created. + + + + hbase.thrift.maxWorkerThreads + 1000 + + The maximum size of the thread pool. When the pending request queue + overflows, new threads are created until their number reaches this number. + After that, the server starts dropping connections. + + + + hbase.thrift.maxQueuedRequests + 1000 + + The maximum number of pending Thrift connections waiting in the queue. If + there are no idle threads in the pool, the server queues requests. Only + when the queue overflows, new threads are added, up to + hbase.thrift.maxQueuedRequests threads. + + diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 04d1c551944..6bc27caeb70 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -25,6 +25,7 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; +import java.net.ServerSocket; import java.security.MessageDigest; import java.util.ArrayList; import java.util.Arrays; @@ -1714,4 +1715,31 @@ public class HBaseTestingUtility { return table; } + + private static final int MIN_RANDOM_PORT = 0xc000; + private static final int MAX_RANDOM_PORT = 0xfffe; + + /** + * Returns a random port. These ports cannot be registered with IANA and are + * intended for dynamic allocation (see http://bit.ly/dynports). + */ + public static int randomPort() { + return MIN_RANDOM_PORT + + new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT); + } + + public static int randomFreePort() { + int port = 0; + do { + port = randomPort(); + try { + ServerSocket sock = new ServerSocket(port); + sock.close(); + } catch (IOException ex) { + port = 0; + } + } while (port == 0); + return port; + } + } diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index 25b77f94053..ffa6ed5b59f 100644 --- a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.thrift; import static org.junit.Assert.*; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -30,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.Mutation; import org.apache.hadoop.hbase.thrift.generated.TCell; import org.apache.hadoop.hbase.thrift.generated.TRowResult; @@ -47,20 +47,22 @@ import org.junit.experimental.categories.Category; public class TestThriftServer { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); protected static final int MAXVERSIONS = 3; - private static ByteBuffer $bb(String i) { + + private static ByteBuffer asByteBuffer(String i) { return ByteBuffer.wrap(Bytes.toBytes(i)); } + // Static names for tables, columns, rows, and values - private static ByteBuffer tableAname = $bb("tableA"); - private static ByteBuffer tableBname = $bb("tableB"); - private static ByteBuffer columnAname = $bb("columnA:"); - private static ByteBuffer columnBname = $bb("columnB:"); - private static ByteBuffer rowAname = $bb("rowA"); - private static ByteBuffer rowBname = $bb("rowB"); - private static ByteBuffer valueAname = $bb("valueA"); - private static ByteBuffer valueBname = $bb("valueB"); - private static ByteBuffer valueCname = $bb("valueC"); - private static ByteBuffer valueDname = $bb("valueD"); + private static ByteBuffer tableAname = asByteBuffer("tableA"); + private static ByteBuffer tableBname = asByteBuffer("tableB"); + private static ByteBuffer columnAname = asByteBuffer("columnA:"); + private static ByteBuffer columnBname = asByteBuffer("columnB:"); + private static ByteBuffer rowAname = asByteBuffer("rowA"); + private static ByteBuffer rowBname = asByteBuffer("rowB"); + private static ByteBuffer valueAname = asByteBuffer("valueA"); + private static ByteBuffer valueBname = asByteBuffer("valueB"); + private static ByteBuffer valueCname = asByteBuffer("valueC"); + private static ByteBuffer valueDname = asByteBuffer("valueD"); @BeforeClass public static void beforeClass() throws Exception { @@ -100,7 +102,11 @@ public class TestThriftServer { public void doTestTableCreateDrop() throws Exception { ThriftServer.HBaseHandler handler = new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + createTestTables(handler); + dropTestTables(handler); + } + public static void createTestTables(Hbase.Iface handler) throws Exception { // Create/enable/disable/delete tables, ensure methods act correctly assertEquals(handler.getTableNames().size(), 0); handler.createTable(tableAname, getColumnDescriptors()); @@ -109,6 +115,9 @@ public class TestThriftServer { assertTrue(handler.isTableEnabled(tableAname)); handler.createTable(tableBname, new ArrayList()); assertEquals(handler.getTableNames().size(), 2); + } + + public static void dropTestTables(Hbase.Iface handler) throws Exception { handler.disableTable(tableBname); assertFalse(handler.isTableEnabled(tableBname)); handler.deleteTable(tableBname); @@ -121,7 +130,7 @@ public class TestThriftServer { handler.disableTable(tableAname);*/ handler.deleteTable(tableAname); } - + /** * Tests adding a series of Mutations and BatchMutations, including a * delete mutation. Also tests data retrieval, and getting back multiple @@ -343,7 +352,7 @@ public class TestThriftServer { handler.disableTable(tableAname); handler.deleteTable(tableAname); } - + /** * For HBASE-2556 * Tests for GetTableRegions @@ -357,19 +366,19 @@ public class TestThriftServer { int regionCount = handler.getTableRegions(tableAname).size(); assertEquals("empty table should have only 1 region, " + "but found " + regionCount, regionCount, 1); - handler.disableTable(tableAname); + handler.disableTable(tableAname); handler.deleteTable(tableAname); regionCount = handler.getTableRegions(tableAname).size(); assertEquals("non-existing table should have 0 region, " + - "but found " + regionCount, regionCount, 0); - } - + "but found " + regionCount, regionCount, 0); + } + /** * * @return a List of ColumnDescriptors for use in creating a table. Has one * default ColumnDescriptor and one ColumnDescriptor with fewer versions */ - private List getColumnDescriptors() { + private static List getColumnDescriptors() { ArrayList cDescriptors = new ArrayList(); // A default ColumnDescriptor diff --git a/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java new file mode 100644 index 00000000000..c9aa0d9e487 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java @@ -0,0 +1,230 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.thrift; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.base.Joiner; + +/** + * Start the HBase Thrift server on a random port through the command-line + * interface and talk to it from client side. + */ +@Category(LargeTests.class) +@RunWith(Parameterized.class) +public class TestThriftServerCmdLine { + + public static final Log LOG = + LogFactory.getLog(TestThriftServerCmdLine.class); + + private final ImplType implType; + private boolean specifyFramed; + private boolean specifyBindIP; + private boolean specifyCompact; + + private static final HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + + private Thread cmdLineThread; + private volatile Exception cmdLineException; + + private Exception clientSideException; + + private ThriftServer thriftServer; + private int port; + + @Parameters + public static Collection getParameters() { + Collection parameters = new ArrayList(); + for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) { + for (boolean specifyFramed : new boolean[] {false, true}) { + for (boolean specifyBindIP : new boolean[] {false, true}) { + if (specifyBindIP && !implType.canSpecifyBindIP) { + continue; + } + for (boolean specifyCompact : new boolean[] {false, true}) { + parameters.add(new Object[]{implType, new Boolean(specifyFramed), + new Boolean(specifyBindIP), new Boolean(specifyCompact)}); + } + } + } + } + return parameters; + } + + public TestThriftServerCmdLine(ImplType implType, boolean specifyFramed, + boolean specifyBindIP, boolean specifyCompact) { + this.implType = implType; + this.specifyFramed = specifyFramed; + this.specifyBindIP = specifyBindIP; + this.specifyCompact = specifyCompact; + LOG.debug("implType=" + implType + ", " + + "specifyFramed=" + specifyFramed + ", " + + "specifyBindIP=" + specifyBindIP + ", " + + "specifyCompact=" + specifyCompact); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private void startCmdLineThread(final String[] args) { + LOG.info("Starting HBase Thrift server with command line: " + + Joiner.on(" ").join(args)); + + cmdLineException = null; + cmdLineThread = new Thread(new Runnable() { + @Override + public void run() { + try { + thriftServer.doMain(args); + } catch (Exception e) { + cmdLineException = e; + } + } + }); + cmdLineThread.setName(ThriftServer.class.getSimpleName() + + "-cmdline"); + cmdLineThread.start(); + } + + @Test(timeout=30 * 1000) + public void testRunThriftServer() throws Exception { + List args = new ArrayList(); + if (implType != null) { + String serverTypeOption = implType.toString(); + assertTrue(serverTypeOption.startsWith("-")); + args.add(serverTypeOption); + } + port = HBaseTestingUtility.randomFreePort(); + args.add("-" + ThriftServer.PORT_OPTION); + args.add(String.valueOf(port)); + if (specifyFramed) { + args.add("-" + ThriftServer.FRAMED_OPTION); + } + if (specifyBindIP) { + args.add("-" + ThriftServer.BIND_OPTION); + args.add(InetAddress.getLocalHost().getHostName()); + } + if (specifyCompact) { + args.add("-" + ThriftServer.COMPACT_OPTION); + } + args.add("start"); + + thriftServer = new ThriftServer(TEST_UTIL.getConfiguration()); + startCmdLineThread(args.toArray(new String[0])); + Threads.sleepWithoutInterrupt(2000); + + try { + talkToThriftServer(); + } catch (Exception ex) { + clientSideException = ex; + } finally { + stopCmdLineThread(); + } + + Class expectedClass; + if (implType != null) { + expectedClass = implType.serverClass; + } else { + expectedClass = TBoundedThreadPoolServer.class; + } + assertEquals(expectedClass, thriftServer.server.getClass()); + + if (clientSideException != null) { + LOG.error("Thrift client threw an exception", clientSideException); + throw new Exception(clientSideException); + } + } + + private void talkToThriftServer() throws Exception { + TSocket sock = new TSocket(InetAddress.getLocalHost().getHostName(), + port); + TTransport transport = sock; + if (specifyFramed || implType.isAlwaysFramed) { + transport = new TFramedTransport(transport); + } + + sock.open(); + TProtocol prot; + if (specifyCompact) { + prot = new TCompactProtocol(transport); + } else { + prot = new TBinaryProtocol(transport); + } + Hbase.Client client = new Hbase.Client(prot); + List tableNames = client.getTableNames(); + if (tableNames.isEmpty()) { + TestThriftServer.createTestTables(client); + assertEquals(2, client.getTableNames().size()); + } else { + assertEquals(2, tableNames.size()); + assertEquals(2, client.getColumnDescriptors(tableNames.get(0)).size()); + } + sock.close(); + } + + private void stopCmdLineThread() throws Exception { + LOG.debug("Stopping " + implType.simpleClassName() + " Thrift server"); + thriftServer.stop(); + cmdLineThread.join(); + if (cmdLineException != null) { + LOG.error("Command-line invocation of HBase Thrift server threw an " + + "exception", cmdLineException); + throw new Exception(cmdLineException); + } + } + +} diff --git a/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java b/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java new file mode 100644 index 00000000000..8ec534d8b4a --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/util/TestThreads.java @@ -0,0 +1,75 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +public class TestThreads { + private static final Log LOG = LogFactory.getLog(TestThreads.class); + + private static final int SLEEP_TIME_MS = 5000; + private static final int TOLERANCE_MS = (int) (0.05 * SLEEP_TIME_MS); + + private volatile boolean wasInterrupted; + + @Test(timeout=6000) + public void testSleepWithoutInterrupt() throws InterruptedException { + Thread sleeper = new Thread(new Runnable() { + @Override + public void run() { + LOG.debug("Sleeper thread: sleeping for " + SLEEP_TIME_MS); + Threads.sleepWithoutInterrupt(SLEEP_TIME_MS); + LOG.debug("Sleeper thread: finished sleeping"); + wasInterrupted = Thread.currentThread().isInterrupted(); + } + }); + LOG.debug("Starting sleeper thread (" + SLEEP_TIME_MS + " ms)"); + sleeper.start(); + long startTime = System.currentTimeMillis(); + LOG.debug("Main thread: sleeping for 500 ms"); + Threads.sleep(500); + + LOG.debug("Interrupting the sleeper thread and sleeping for 2000 ms"); + sleeper.interrupt(); + Threads.sleep(2000); + + LOG.debug("Interrupting the sleeper thread and sleeping for 1000 ms"); + sleeper.interrupt(); + Threads.sleep(1000); + + LOG.debug("Interrupting the sleeper thread again"); + sleeper.interrupt(); + sleeper.join(); + + assertTrue("sleepWithoutInterrupt did not preserve the thread's " + + "interrupted status", wasInterrupted); + + long timeElapsed = System.currentTimeMillis() - startTime; + assertTrue("Elapsed time " + timeElapsed + " ms is out of the expected " + + "range of the sleep time " + SLEEP_TIME_MS, + Math.abs(timeElapsed - SLEEP_TIME_MS) < TOLERANCE_MS); + LOG.debug("Target sleep time: " + SLEEP_TIME_MS + ", time elapsed: " + + timeElapsed); + } + +}