diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java b/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
new file mode 100644
index 00000000000..494f5a53c93
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+ /**
+ * Number of selector threads for reading and writing socket
+ */
+ public static final String SELECTOR_THREADS_CONF_KEY =
+ "hbase.thrift.selector.threads";
+
+ /**
+ * Number fo threads for processing the thrift calls
+ */
+ public static final String WORKER_THREADS_CONF_KEY =
+ "hbase.thrift.worker.threads";
+
+ /**
+ * Time to wait for server to stop gracefully
+ */
+ public static final String STOP_TIMEOUT_CONF_KEY =
+ "hbase.thrift.stop.timeout.seconds";
+
+ /**
+ * Maximum number of accepted elements per selector
+ */
+ public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY =
+ "hbase.thrift.accept.queue.size.per.selector";
+
+ /**
+ * The strategy for handling new accepted connections.
+ */
+ public static final String ACCEPT_POLICY_CONF_KEY =
+ "hbase.thrift.accept.policy";
+
+ public HThreadedSelectorServerArgs(
+ TNonblockingServerTransport transport, Configuration conf) {
+ super(transport);
+ readConf(conf);
+ }
+
+ private void readConf(Configuration conf) {
+ int selectorThreads = conf.getInt(
+ SELECTOR_THREADS_CONF_KEY, getSelectorThreads());
+ int workerThreads = conf.getInt(
+ WORKER_THREADS_CONF_KEY, getWorkerThreads());
+ int stopTimeoutVal = conf.getInt(
+ STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal());
+ int acceptQueueSizePerThread = conf.getInt(
+ ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread());
+ AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+ ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase());
+
+ super.selectorThreads(selectorThreads)
+ .workerThreads(workerThreads)
+ .stopTimeoutVal(stopTimeoutVal)
+ .acceptQueueSizePerThread(acceptQueueSizePerThread)
+ .acceptPolicy(acceptPolicy);
+
+ LOG.info("Read configuration selectorThreads:" + selectorThreads +
+ " workerThreads:" + workerThreads +
+ " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+ " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+ " acceptPolicy:" + acceptPolicy);
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
new file mode 100644
index 00000000000..b6a6349709f
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -0,0 +1,1237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.util.Bytes.getBytes;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.generated.TScan;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.net.DNS;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Joiner;
+
+/**
+ * ThriftServerRunner - this class starts up a Thrift server which implements
+ * the Hbase API specified in the Hbase.thrift IDL file.
+ */
+public class ThriftServerRunner implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
+
+ static final String SERVER_TYPE_CONF_KEY =
+ "hbase.regionserver.thrift.server.type";
+
+ static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
+ static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
+ static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
+ static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+
+ private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+ private static final int DEFAULT_LISTEN_PORT = 9090;
+
+ private Configuration conf;
+ volatile TServer tserver;
+ private final HBaseHandler handler;
+
+ /** An enum of server implementation selections */
+ enum ImplType {
+ HS_HA("hsha", true, THsHaServer.class, false),
+ NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
+ THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
+ THREADED_SELECTOR(
+ "threadedselector", true, TThreadedSelectorServer.class, false);
+
+ public static final ImplType DEFAULT = THREAD_POOL;
+
+ final String option;
+ final boolean isAlwaysFramed;
+ final Class extends TServer> serverClass;
+ final boolean canSpecifyBindIP;
+
+ ImplType(String option, boolean isAlwaysFramed,
+ Class extends TServer> serverClass, boolean canSpecifyBindIP) {
+ this.option = option;
+ this.isAlwaysFramed = isAlwaysFramed;
+ this.serverClass = serverClass;
+ this.canSpecifyBindIP = canSpecifyBindIP;
+ }
+
+ /**
+ * @return -option
so we can get the list of options from
+ * {@link #values()}
+ */
+ @Override
+ public String toString() {
+ return "-" + option;
+ }
+
+ String getDescription() {
+ StringBuilder sb = new StringBuilder("Use the " +
+ serverClass.getSimpleName());
+ if (isAlwaysFramed) {
+ sb.append(" This implies the framed transport.");
+ }
+ if (this == DEFAULT) {
+ sb.append("This is the default.");
+ }
+ return sb.toString();
+ }
+
+ static OptionGroup createOptionGroup() {
+ OptionGroup group = new OptionGroup();
+ for (ImplType t : values()) {
+ group.addOption(new Option(t.option, t.getDescription()));
+ }
+ return group;
+ }
+
+ static ImplType getServerImpl(Configuration conf) {
+ String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
+ for (ImplType t : values()) {
+ if (confType.equals(t.option)) {
+ return t;
+ }
+ }
+ throw new AssertionError("Unknown server ImplType.option:" + confType);
+ }
+
+ static void setServerImpl(CommandLine cmd, Configuration conf) {
+ ImplType chosenType = null;
+ int numChosen = 0;
+ for (ImplType t : values()) {
+ if (cmd.hasOption(t.option)) {
+ chosenType = t;
+ ++numChosen;
+ }
+ }
+ if (numChosen != 1) {
+ throw new AssertionError("Exactly one option out of " +
+ Arrays.toString(values()) + " has to be specified");
+ }
+ LOG.info("Setting thrift server to " + chosenType.option);
+ conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
+ }
+
+ public String simpleClassName() {
+ return serverClass.getSimpleName();
+ }
+
+ public static List serversThatCannotSpecifyBindIP() {
+ List l = new ArrayList();
+ for (ImplType t : values()) {
+ if (!t.canSpecifyBindIP) {
+ l.add(t.simpleClassName());
+ }
+ }
+ return l;
+ }
+
+ }
+
+ public ThriftServerRunner(Configuration conf) throws IOException {
+ this(conf, new ThriftServerRunner.HBaseHandler(conf));
+ }
+
+ public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
+ this.conf = HBaseConfiguration.create(conf);
+ this.handler = handler;
+ }
+
+ /*
+ * Runs the Thrift server
+ */
+ @Override
+ public void run() {
+ try {
+ setupServer();
+ tserver.serve();
+ } catch (Exception e) {
+ LOG.fatal("Cannot run ThriftServer");
+ // Crash the process if the ThriftServer is not running
+ System.exit(-1);
+ }
+ }
+
+ public void shutdown() {
+ if (tserver != null) {
+ tserver.stop();
+ tserver = null;
+ }
+ }
+
+ /**
+ * Setting up the thrift TServer
+ */
+ private void setupServer() throws Exception {
+ // Get port to bind to
+ int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+
+ // Construct correct ProtocolFactory
+ TProtocolFactory protocolFactory;
+ if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
+ LOG.debug("Using compact protocol");
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ LOG.debug("Using binary protocol");
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ Hbase.Processor processor =
+ new Hbase.Processor(handler);
+ ImplType implType = ImplType.getServerImpl(conf);
+
+ // Construct correct TransportFactory
+ TTransportFactory transportFactory;
+ if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
+ transportFactory = new TFramedTransport.Factory();
+ LOG.debug("Using framed transport");
+ } else {
+ transportFactory = new TTransportFactory();
+ }
+
+ if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+ LOG.error("Server types " + Joiner.on(", ").join(
+ ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
+ "address binding at the moment. See " +
+ "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
+ throw new RuntimeException(
+ "-" + BIND_CONF_KEY + " not supported with " + implType);
+ }
+
+ if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+ implType == ImplType.THREADED_SELECTOR) {
+
+ TNonblockingServerTransport serverTransport =
+ new TNonblockingServerSocket(listenPort);
+
+ if (implType == ImplType.NONBLOCKING) {
+ TNonblockingServer.Args serverArgs =
+ new TNonblockingServer.Args(serverTransport);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new TNonblockingServer(serverArgs);
+ } else if (implType == ImplType.HS_HA) {
+ THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new THsHaServer(serverArgs);
+ } else { // THREADED_SELECTOR
+ TThreadedSelectorServer.Args serverArgs =
+ new HThreadedSelectorServerArgs(serverTransport, conf);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new TThreadedSelectorServer(serverArgs);
+ }
+ LOG.info("starting HBase " + implType.simpleClassName() +
+ " server on " + Integer.toString(listenPort));
+ } else if (implType == ImplType.THREAD_POOL) {
+ // Thread pool server. Get the IP address to bind to.
+ InetAddress listenAddress = getBindAddress(conf);
+
+ TServerTransport serverTransport = new TServerSocket(
+ new InetSocketAddress(listenAddress, listenPort));
+
+ TBoundedThreadPoolServer.Args serverArgs =
+ new TBoundedThreadPoolServer.Args(serverTransport, conf);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ + listenAddress + ":" + Integer.toString(listenPort)
+ + "; " + serverArgs);
+ tserver = new TBoundedThreadPoolServer(serverArgs);
+ } else {
+ throw new AssertionError("Unsupported Thrift server implementation: " +
+ implType.simpleClassName());
+ }
+
+ // A sanity check that we instantiated the right type of server.
+ if (tserver.getClass() != implType.serverClass) {
+ throw new AssertionError("Expected to create Thrift server class " +
+ implType.serverClass.getName() + " but got " +
+ tserver.getClass().getName());
+ }
+
+ // login the server principal (if using secure Hadoop)
+ Configuration conf = handler.conf;
+ if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
+ String machineName = Strings.domainNamePointerToHostName(
+ DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
+ conf.get("hbase.thrift.dns.nameserver", "default")));
+ User.login(conf, "hbase.thrift.keytab.file",
+ "hbase.thrift.kerberos.principal", machineName);
+ }
+ }
+
+ private InetAddress getBindAddress(Configuration conf)
+ throws UnknownHostException {
+ String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+ return InetAddress.getByName(bindAddressStr);
+ }
+
+ /**
+ * The HBaseHandler is a glue object that connects Thrift RPC calls to the
+ * HBase client API primarily defined in the HBaseAdmin and HTable objects.
+ */
+ public static class HBaseHandler implements Hbase.Iface {
+ protected Configuration conf;
+ protected HBaseAdmin admin = null;
+ protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ // nextScannerId and scannerMap are used to manage scanner state
+ protected int nextScannerId = 0;
+ protected HashMap scannerMap = null;
+
+ private static ThreadLocal