diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java
new file mode 100644
index 00000000000..8e3d0048a3d
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Thrift related constants
+ */
+@InterfaceAudience.Private
+public final class Constants {
+ private Constants(){}
+
+ public static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
+
+ public static final String SERVER_TYPE_CONF_KEY =
+ "hbase.regionserver.thrift.server.type";
+
+ public static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
+ public static final boolean COMPACT_CONF_DEFAULT = false;
+
+ public static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
+ public static final boolean FRAMED_CONF_DEFAULT = false;
+
+ public static final String MAX_FRAME_SIZE_CONF_KEY =
+ "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
+ public static final int MAX_FRAME_SIZE_CONF_DEFAULT = 2;
+
+ public static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
+ public static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
+
+ public static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
+ public static final int HTTP_MIN_THREADS_KEY_DEFAULT = 2;
+
+ public static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
+ public static final int HTTP_MAX_THREADS_KEY_DEFAULT = 100;
+
+ // ssl related configs
+ public static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
+ public static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
+ public static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY =
+ "hbase.thrift.ssl.keystore.password";
+ public static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY
+ = "hbase.thrift.ssl.keystore.keypassword";
+ public static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
+ "hbase.thrift.ssl.exclude.cipher.suites";
+ public static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
+ "hbase.thrift.ssl.include.cipher.suites";
+ public static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY =
+ "hbase.thrift.ssl.exclude.protocols";
+ public static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY =
+ "hbase.thrift.ssl.include.protocols";
+
+
+ public static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
+
+ //kerberos related configs
+ public static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
+ public static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
+ public static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
+ public static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
+ public static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
+ public static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
+
+ /**
+ * Amount of time in milliseconds before a server thread will timeout
+ * waiting for client to send data on a connected socket. Currently,
+ * applies only to TBoundedThreadPoolServer
+ */
+ public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
+ "hbase.thrift.server.socket.read.timeout";
+ public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
+
+
+ /**
+ * Thrift quality of protection configuration key. Valid values can be:
+ * auth-conf: authentication, integrity and confidentiality checking
+ * auth-int: authentication and integrity checking
+ * auth: authentication only
+ *
+ * This is used to authenticate the callers and support impersonation.
+ * The thrift server and the HBase cluster must run in secure mode.
+ */
+ public static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
+
+ public static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
+ public static final int BACKLOG_CONF_DEAFULT = 0;
+
+ public static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
+ public static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+
+ public static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+ public static final int DEFAULT_LISTEN_PORT = 9090;
+
+ public static final String THRIFT_HTTP_ALLOW_OPTIONS_METHOD =
+ "hbase.thrift.http.allow.options.method";
+ public static final boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
+
+ public static final String THRIFT_INFO_SERVER_PORT = "hbase.thrift.info.port";
+ public static final int THRIFT_INFO_SERVER_PORT_DEFAULT = 9095;
+
+ public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS = "hbase.thrift.info.bindAddress";
+ public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT = "0.0.0.0";
+
+ public static final String THRIFT_QUEUE_SIZE = "hbase.thrift.queue.size";
+ public static final int THRIFT_QUEUE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+ public static final String THRIFT_SELECTOR_NUM = "hbase.thrift.selector.num";
+
+ public static final String THRIFT_FILTERS = "hbase.thrift.filters";
+
+ // Command line options
+
+ public static final String READ_TIMEOUT_OPTION = "readTimeout";
+ public static final String MIN_WORKERS_OPTION = "minWorkers";
+ public static final String MAX_WORKERS_OPTION = "workers";
+ public static final String MAX_QUEUE_SIZE_OPTION = "queue";
+ public static final String SELECTOR_NUM_OPTION = "selectors";
+ public static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
+ public static final String BIND_OPTION = "bind";
+ public static final String COMPACT_OPTION = "compact";
+ public static final String FRAMED_OPTION = "framed";
+ public static final String PORT_OPTION = "port";
+ public static final String INFOPORT_OPTION = "infoport";
+
+ //for thrift2 server
+ public static final String READONLY_OPTION ="readonly";
+
+ public static final String THRIFT_READONLY_ENABLED = "hbase.thrift.readonly";
+ public static final boolean THRIFT_READONLY_ENABLED_DEFAULT = false;
+
+
+
+
+
+}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
new file mode 100644
index 00000000000..799087149c1
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConnectionCache;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * abstract class for HBase handler
+ * providing a Connection cache and get table/admin method
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public abstract class HBaseServiceHandler {
+ public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
+ public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
+
+ protected Configuration conf;
+
+ protected final ConnectionCache connectionCache;
+
+ public HBaseServiceHandler(final Configuration c,
+ final UserProvider userProvider) throws IOException {
+ this.conf = c;
+ int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
+ int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
+ connectionCache = new ConnectionCache(
+ conf, userProvider, cleanInterval, maxIdleTime);
+ }
+
+ protected ThriftMetrics metrics = null;
+
+ public void initMetrics(ThriftMetrics metrics) {
+ this.metrics = metrics;
+ }
+
+ public void setEffectiveUser(String effectiveUser) {
+ connectionCache.setEffectiveUser(effectiveUser);
+ }
+
+ /**
+ * Obtain HBaseAdmin. Creates the instance if it is not already created.
+ */
+ protected Admin getAdmin() throws IOException {
+ return connectionCache.getAdmin();
+ }
+
+ /**
+ * Creates and returns a Table instance from a given table name.
+ *
+ * @param tableName
+ * name of table
+ * @return Table object
+ * @throws IOException if getting the table fails
+ */
+ protected Table getTable(final byte[] tableName) throws IOException {
+ String table = Bytes.toString(tableName);
+ return connectionCache.getTable(table);
+ }
+
+ protected Table getTable(final ByteBuffer tableName) throws IOException {
+ return getTable(Bytes.getBytes(tableName));
+ }
+
+
+}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
index 5a6e436467d..1402f8697e8 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
@@ -25,9 +25,8 @@ import java.lang.reflect.Proxy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Converts a Hbase.Iface using InvocationHandler so that it reports process
@@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public final class HbaseHandlerMetricsProxy implements InvocationHandler {
- private static final Logger LOG = LoggerFactory.getLogger(
- HbaseHandlerMetricsProxy.class);
-
- private final Hbase.Iface handler;
+ private final Object handler;
private final ThriftMetrics metrics;
public static Hbase.Iface newInstance(Hbase.Iface handler,
@@ -51,8 +47,18 @@ public final class HbaseHandlerMetricsProxy implements InvocationHandler {
new HbaseHandlerMetricsProxy(handler, metrics, conf));
}
+ // for thrift 2
+ public static THBaseService.Iface newInstance(THBaseService.Iface handler,
+ ThriftMetrics metrics,
+ Configuration conf) {
+ return (THBaseService.Iface) Proxy.newProxyInstance(
+ handler.getClass().getClassLoader(),
+ new Class[]{THBaseService.Iface.class},
+ new HbaseHandlerMetricsProxy(handler, metrics, conf));
+ }
+
private HbaseHandlerMetricsProxy(
- Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) {
+ Object handler, ThriftMetrics metrics, Configuration conf) {
this.handler = handler;
this.metrics = metrics;
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java
new file mode 100644
index 00000000000..7108115173b
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.thrift.Constants.SERVER_TYPE_CONF_KEY;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
+
+/** An enum of server implementation selections */
+@InterfaceAudience.Private
+public enum ImplType {
+ HS_HA("hsha", true, THsHaServer.class, true),
+ NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
+ THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
+ THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
+
+ private static final Logger LOG = LoggerFactory.getLogger(ImplType.class);
+ public static final ImplType DEFAULT = THREAD_POOL;
+
+
+ final String option;
+ final boolean isAlwaysFramed;
+ final Class extends TServer> serverClass;
+ final boolean canSpecifyBindIP;
+
+ private ImplType(String option, boolean isAlwaysFramed,
+ Class extends TServer> serverClass, boolean canSpecifyBindIP) {
+ this.option = option;
+ this.isAlwaysFramed = isAlwaysFramed;
+ this.serverClass = serverClass;
+ this.canSpecifyBindIP = canSpecifyBindIP;
+ }
+
+ /**
+ * @return -option
+ */
+ @Override
+ public String toString() {
+ return "-" + option;
+ }
+
+ public String getOption() {
+ return option;
+ }
+
+ public boolean isAlwaysFramed() {
+ return isAlwaysFramed;
+ }
+
+ public String getDescription() {
+ StringBuilder sb = new StringBuilder("Use the " +
+ serverClass.getSimpleName());
+ if (isAlwaysFramed) {
+ sb.append(" This implies the framed transport.");
+ }
+ if (this == DEFAULT) {
+ sb.append("This is the default.");
+ }
+ return sb.toString();
+ }
+
+ static OptionGroup createOptionGroup() {
+ OptionGroup group = new OptionGroup();
+ for (ImplType t : values()) {
+ group.addOption(new Option(t.option, t.getDescription()));
+ }
+ return group;
+ }
+
+ public static ImplType getServerImpl(Configuration conf) {
+ String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
+ for (ImplType t : values()) {
+ if (confType.equals(t.option)) {
+ return t;
+ }
+ }
+ throw new AssertionError("Unknown server ImplType.option:" + confType);
+ }
+
+ static void setServerImpl(CommandLine cmd, Configuration conf) {
+ ImplType chosenType = null;
+ int numChosen = 0;
+ for (ImplType t : values()) {
+ if (cmd.hasOption(t.option)) {
+ chosenType = t;
+ ++numChosen;
+ }
+ }
+ if (numChosen < 1) {
+ LOG.info("Using default thrift server type");
+ chosenType = DEFAULT;
+ } else if (numChosen > 1) {
+ throw new AssertionError("Exactly one option out of " +
+ Arrays.toString(values()) + " has to be specified");
+ }
+ LOG.info("Using thrift server type " + chosenType.option);
+ conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
+ }
+
+ public String simpleClassName() {
+ return serverClass.getSimpleName();
+ }
+
+ public static List serversThatCannotSpecifyBindIP() {
+ List l = new ArrayList<>();
+ for (ImplType t : values()) {
+ if (!t.canSpecifyBindIP) {
+ l.add(t.simpleClassName());
+ }
+ }
+ return l;
+ }
+}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index e36d6391be3..971cd1708f8 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.TIncrement;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -180,7 +179,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
private final ConcurrentMap countersMap =
new ConcurrentHashMap<>(100000, 0.75f, 1500);
private final ThreadPoolExecutor pool;
- private final HBaseHandler handler;
+ private final ThriftHBaseServiceHandler handler;
private int maxQueueSize = 500000;
private static final int CORE_POOL_SIZE = 1;
@@ -188,7 +187,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class);
@SuppressWarnings("deprecation")
- public IncrementCoalescer(HBaseHandler hand) {
+ public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
this.handler = hand;
LinkedBlockingQueue queue = new LinkedBlockingQueue<>();
pool =
@@ -230,6 +229,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
inc.getAmmount());
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
byte[] qual, long ammount) throws TException {
int countersMapSize = countersMap.size();
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
new file mode 100644
index 00000000000..34bf5e8705b
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
@@ -0,0 +1,1347 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
+import static org.apache.hadoop.hbase.util.Bytes.getBytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TAppend;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.generated.TScan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.thrift.TException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the
+ * HBase client API primarily defined in the Admin and Table objects.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
+
+ public static final int HREGION_VERSION = 1;
+
+ // nextScannerId and scannerMap are used to manage scanner state
+ private int nextScannerId = 0;
+ private HashMap scannerMap;
+ IncrementCoalescer coalescer;
+
+ /**
+ * Returns a list of all the column families for a given Table.
+ *
+ * @param table table
+ * @throws IOException
+ */
+ byte[][] getAllColumns(Table table) throws IOException {
+ HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
+ byte[][] columns = new byte[cds.length][];
+ for (int i = 0; i < cds.length; i++) {
+ columns[i] = Bytes.add(cds[i].getName(),
+ KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
+ }
+ return columns;
+ }
+
+
+ /**
+ * Assigns a unique ID to the scanner and adds the mapping to an internal
+ * hash-map.
+ *
+ * @param scanner the {@link ResultScanner} to add
+ * @return integer scanner id
+ */
+ protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
+ int id = nextScannerId++;
+ ResultScannerWrapper resultScannerWrapper =
+ new ResultScannerWrapper(scanner, sortColumns);
+ scannerMap.put(id, resultScannerWrapper);
+ return id;
+ }
+
+ /**
+ * Returns the scanner associated with the specified ID.
+ *
+ * @param id the ID of the scanner to get
+ * @return a Scanner, or null if ID was invalid.
+ */
+ private synchronized ResultScannerWrapper getScanner(int id) {
+ return scannerMap.get(id);
+ }
+
+ /**
+ * Removes the scanner associated with the specified ID from the internal
+ * id->scanner hash-map.
+ *
+ * @param id the ID of the scanner to remove
+ * @return a Scanner, or null if ID was invalid.
+ */
+ private synchronized ResultScannerWrapper removeScanner(int id) {
+ return scannerMap.remove(id);
+ }
+
+ protected ThriftHBaseServiceHandler(final Configuration c,
+ final UserProvider userProvider) throws IOException {
+ super(c, userProvider);
+ scannerMap = new HashMap<>();
+ this.coalescer = new IncrementCoalescer(this);
+ }
+
+
+ @Override
+ public void enableTable(ByteBuffer tableName) throws IOError {
+ try{
+ getAdmin().enableTable(getTableName(tableName));
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public void disableTable(ByteBuffer tableName) throws IOError{
+ try{
+ getAdmin().disableTable(getTableName(tableName));
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
+ try {
+ return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
+ // table and region.
+ @Override
+ public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
+ try {
+ try {
+ getAdmin().compactRegion(getBytes(tableNameOrRegionName));
+ } catch (IllegalArgumentException e) {
+ // Invalid region, try table
+ getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
+ // to table and region.
+ @Override
+ public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
+ try {
+ try {
+ getAdmin().compactRegion(getBytes(tableNameOrRegionName));
+ } catch (IllegalArgumentException e) {
+ // Invalid region, try table
+ getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public List getTableNames() throws IOError {
+ try {
+ TableName[] tableNames = this.getAdmin().listTableNames();
+ ArrayList list = new ArrayList<>(tableNames.length);
+ for (TableName tableName : tableNames) {
+ list.add(ByteBuffer.wrap(tableName.getName()));
+ }
+ return list;
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ /**
+ * @return the list of regions in the given table, or an empty list if the table does not exist
+ */
+ @Override
+ public List getTableRegions(ByteBuffer tableName) throws IOError {
+ try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
+ List regionLocations = locator.getAllRegionLocations();
+ List results = new ArrayList<>(regionLocations.size());
+ for (HRegionLocation regionLocation : regionLocations) {
+ RegionInfo info = regionLocation.getRegionInfo();
+ ServerName serverName = regionLocation.getServerName();
+ TRegionInfo region = new TRegionInfo();
+ region.serverName = ByteBuffer.wrap(
+ Bytes.toBytes(serverName.getHostname()));
+ region.port = serverName.getPort();
+ region.startKey = ByteBuffer.wrap(info.getStartKey());
+ region.endKey = ByteBuffer.wrap(info.getEndKey());
+ region.id = info.getRegionId();
+ region.name = ByteBuffer.wrap(info.getRegionName());
+ region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
+ results.add(region);
+ }
+ return results;
+ } catch (TableNotFoundException e) {
+ // Return empty list for non-existing table
+ return Collections.emptyList();
+ } catch (IOException e){
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public List get(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ Map attributes)
+ throws IOError {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ return get(tableName, row, famAndQf[0], null, attributes);
+ }
+ if (famAndQf.length == 2) {
+ return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
+ }
+ throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+ }
+
+ /**
+ * Note: this internal interface is slightly different from public APIs in regard to handling
+ * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
+ * we respect qual == null as a request for the entire column family. The caller (
+ * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
+ * column is parse like normal.
+ */
+ protected List get(ByteBuffer tableName,
+ ByteBuffer row,
+ byte[] family,
+ byte[] qualifier,
+ Map attributes) throws IOError {
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ if (qualifier == null) {
+ get.addFamily(family);
+ } else {
+ get.addColumn(family, qualifier);
+ }
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.rawCells());
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public List getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ int numVersions, Map attributes) throws IOError {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
+ }
+ if (famAndQf.length == 2) {
+ return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
+ }
+ throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+
+ }
+
+ /**
+ * Note: this public interface is slightly different from public Java APIs in regard to
+ * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
+ * Rather, we respect qual == null as a request for the entire column family. If you want to
+ * access the entire column family, use
+ * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
+ * that lacks a {@code ':'}.
+ */
+ public List getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
+ byte[] qualifier, int numVersions, Map attributes) throws IOError {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ if (null == qualifier) {
+ get.addFamily(family);
+ } else {
+ get.addColumn(family, qualifier);
+ }
+ get.setMaxVersions(numVersions);
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.rawCells());
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public List getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ long timestamp, int numVersions, Map attributes) throws IOError {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
+ }
+ if (famAndQf.length == 2) {
+ return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
+ attributes);
+ }
+ throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+ }
+
+ /**
+ * Note: this internal interface is slightly different from public APIs in regard to handling
+ * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
+ * we respect qual == null as a request for the entire column family. The caller (
+ * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
+ * consistent in that the column is parse like normal.
+ */
+ protected List getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
+ byte[] qualifier, long timestamp, int numVersions, Map attributes)
+ throws IOError {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ if (null == qualifier) {
+ get.addFamily(family);
+ } else {
+ get.addColumn(family, qualifier);
+ }
+ get.setTimeRange(0, timestamp);
+ get.setMaxVersions(numVersions);
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.rawCells());
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public List getRow(ByteBuffer tableName, ByteBuffer row,
+ Map attributes) throws IOError {
+ return getRowWithColumnsTs(tableName, row, null,
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
+ }
+
+ @Override
+ public List getRowWithColumns(ByteBuffer tableName,
+ ByteBuffer row,
+ List columns,
+ Map attributes) throws IOError {
+ return getRowWithColumnsTs(tableName, row, columns,
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
+ }
+
+ @Override
+ public List getRowTs(ByteBuffer tableName, ByteBuffer row,
+ long timestamp, Map attributes) throws IOError {
+ return getRowWithColumnsTs(tableName, row, null,
+ timestamp, attributes);
+ }
+
+ @Override
+ public List getRowWithColumnsTs(
+ ByteBuffer tableName, ByteBuffer row, List columns,
+ long timestamp, Map attributes) throws IOError {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ if (columns == null) {
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ get.setTimeRange(0, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ }
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ for(ByteBuffer column : columns) {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
+ }
+ get.setTimeRange(0, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public List getRows(ByteBuffer tableName,
+ List rows,
+ Map attributes)
+ throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, null,
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
+ }
+
+ @Override
+ public List getRowsWithColumns(ByteBuffer tableName,
+ List rows,
+ List columns,
+ Map attributes) throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, columns,
+ HConstants.LATEST_TIMESTAMP,
+ attributes);
+ }
+
+ @Override
+ public List getRowsTs(ByteBuffer tableName,
+ List rows,
+ long timestamp,
+ Map attributes) throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, null,
+ timestamp, attributes);
+ }
+
+ @Override
+ public List getRowsWithColumnsTs(ByteBuffer tableName,
+ List rows,
+ List columns, long timestamp,
+ Map attributes) throws IOError {
+
+ Table table= null;
+ try {
+ List gets = new ArrayList<>(rows.size());
+ table = getTable(tableName);
+ if (metrics != null) {
+ metrics.incNumRowKeysInBatchGet(rows.size());
+ }
+ for (ByteBuffer row : rows) {
+ Get get = new Get(getBytes(row));
+ addAttributes(get, attributes);
+ if (columns != null) {
+
+ for(ByteBuffer column : columns) {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
+ }
+ }
+ get.setTimeRange(0, timestamp);
+ gets.add(get);
+ }
+ Result[] result = table.get(gets);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void deleteAll(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ Map attributes)
+ throws IOError {
+ deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
+ attributes);
+ }
+
+ @Override
+ public void deleteAllTs(ByteBuffer tableName,
+ ByteBuffer row,
+ ByteBuffer column,
+ long timestamp, Map attributes) throws IOError {
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Delete delete = new Delete(getBytes(row));
+ addAttributes(delete, attributes);
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ delete.addFamily(famAndQf[0], timestamp);
+ } else {
+ delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ table.delete(delete);
+
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void deleteAllRow(
+ ByteBuffer tableName, ByteBuffer row,
+ Map attributes) throws IOError {
+ deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
+ }
+
+ @Override
+ public void deleteAllRowTs(
+ ByteBuffer tableName, ByteBuffer row, long timestamp,
+ Map attributes) throws IOError {
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Delete delete = new Delete(getBytes(row), timestamp);
+ addAttributes(delete, attributes);
+ table.delete(delete);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void createTable(ByteBuffer in_tableName,
+ List columnFamilies) throws IOError, IllegalArgument, AlreadyExists {
+ TableName tableName = getTableName(in_tableName);
+ try {
+ if (getAdmin().tableExists(tableName)) {
+ throw new AlreadyExists("table name already in use");
+ }
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (ColumnDescriptor col : columnFamilies) {
+ HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
+ desc.addFamily(colDesc);
+ }
+ getAdmin().createTable(desc);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ }
+ }
+
+ private static TableName getTableName(ByteBuffer buffer) {
+ return TableName.valueOf(getBytes(buffer));
+ }
+
+ @Override
+ public void deleteTable(ByteBuffer in_tableName) throws IOError {
+ TableName tableName = getTableName(in_tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleteTable: table={}", tableName);
+ }
+ try {
+ if (!getAdmin().tableExists(tableName)) {
+ throw new IOException("table does not exist");
+ }
+ getAdmin().deleteTable(tableName);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public void mutateRow(ByteBuffer tableName, ByteBuffer row,
+ List mutations, Map attributes)
+ throws IOError, IllegalArgument {
+ mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
+ }
+
+ @Override
+ public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+ List mutations, long timestamp,
+ Map attributes)
+ throws IOError, IllegalArgument {
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Put put = new Put(getBytes(row), timestamp);
+ addAttributes(put, attributes);
+
+ Delete delete = new Delete(getBytes(row));
+ addAttributes(delete, attributes);
+ if (metrics != null) {
+ metrics.incNumRowKeysInBatchMutate(mutations.size());
+ }
+
+ // I apologize for all this mess :)
+ CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ for (Mutation m : mutations) {
+ byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
+ if (m.isDelete) {
+ if (famAndQf.length == 1) {
+ delete.addFamily(famAndQf[0], timestamp);
+ } else {
+ delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ } else {
+ if(famAndQf.length == 1) {
+ LOG.warn("No column qualifier specified. Delete is the only mutation supported "
+ + "over the whole column family.");
+ } else {
+ put.add(builder.clear()
+ .setRow(put.getRow())
+ .setFamily(famAndQf[0])
+ .setQualifier(famAndQf[1])
+ .setTimestamp(put.getTimestamp())
+ .setType(Cell.Type.Put)
+ .setValue(m.value != null ? getBytes(m.value)
+ : HConstants.EMPTY_BYTE_ARRAY)
+ .build());
+ }
+ put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ }
+ }
+ if (!delete.isEmpty()) {
+ table.delete(delete);
+ }
+ if (!put.isEmpty()) {
+ table.put(put);
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void mutateRows(ByteBuffer tableName, List rowBatches,
+ Map attributes)
+ throws IOError, IllegalArgument, TException {
+ mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
+ }
+
+ @Override
+ public void mutateRowsTs(
+ ByteBuffer tableName, List rowBatches, long timestamp,
+ Map attributes)
+ throws IOError, IllegalArgument, TException {
+ List puts = new ArrayList<>();
+ List deletes = new ArrayList<>();
+ CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+ for (BatchMutation batch : rowBatches) {
+ byte[] row = getBytes(batch.row);
+ List mutations = batch.mutations;
+ Delete delete = new Delete(row);
+ addAttributes(delete, attributes);
+ Put put = new Put(row, timestamp);
+ addAttributes(put, attributes);
+ for (Mutation m : mutations) {
+ byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
+ if (m.isDelete) {
+ // no qualifier, family only.
+ if (famAndQf.length == 1) {
+ delete.addFamily(famAndQf[0], timestamp);
+ } else {
+ delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
+ : Durability.SKIP_WAL);
+ } else {
+ if (famAndQf.length == 1) {
+ LOG.warn("No column qualifier specified. Delete is the only mutation supported "
+ + "over the whole column family.");
+ }
+ if (famAndQf.length == 2) {
+ try {
+ put.add(builder.clear()
+ .setRow(put.getRow())
+ .setFamily(famAndQf[0])
+ .setQualifier(famAndQf[1])
+ .setTimestamp(put.getTimestamp())
+ .setType(Cell.Type.Put)
+ .setValue(m.value != null ? getBytes(m.value)
+ : HConstants.EMPTY_BYTE_ARRAY)
+ .build());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid famAndQf provided.");
+ }
+ put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ }
+ }
+ if (!delete.isEmpty()) {
+ deletes.add(delete);
+ }
+ if (!put.isEmpty()) {
+ puts.add(put);
+ }
+ }
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ if (!puts.isEmpty()) {
+ table.put(puts);
+ }
+ if (!deletes.isEmpty()) {
+ table.delete(deletes);
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public long atomicIncrement(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
+ throws IOError, IllegalArgument, TException {
+ byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
+ }
+ return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
+ }
+
+ protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
+ byte [] family, byte [] qualifier, long amount)
+ throws IOError, IllegalArgument, TException {
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ return table.incrementColumnValue(
+ getBytes(row), family, qualifier, amount);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void scannerClose(int id) throws IOError, IllegalArgument {
+ LOG.debug("scannerClose: id={}", id);
+ ResultScannerWrapper resultScannerWrapper = getScanner(id);
+ if (resultScannerWrapper == null) {
+ LOG.warn("scanner ID is invalid");
+ throw new IllegalArgument("scanner ID is invalid");
+ }
+ resultScannerWrapper.getScanner().close();
+ removeScanner(id);
+ }
+
+ @Override
+ public List scannerGetList(int id,int nbRows)
+ throws IllegalArgument, IOError {
+ LOG.debug("scannerGetList: id={}", id);
+ ResultScannerWrapper resultScannerWrapper = getScanner(id);
+ if (null == resultScannerWrapper) {
+ String message = "scanner ID is invalid";
+ LOG.warn(message);
+ throw new IllegalArgument("scanner ID is invalid");
+ }
+
+ Result [] results;
+ try {
+ results = resultScannerWrapper.getScanner().next(nbRows);
+ if (null == results) {
+ return new ArrayList<>();
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
+ }
+
+ @Override
+ public List scannerGet(int id) throws IllegalArgument, IOError {
+ return scannerGetList(id,1);
+ }
+
+ @Override
+ public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
+ Map attributes)
+ throws IOError {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan();
+ addAttributes(scan, attributes);
+ if (tScan.isSetStartRow()) {
+ scan.setStartRow(tScan.getStartRow());
+ }
+ if (tScan.isSetStopRow()) {
+ scan.setStopRow(tScan.getStopRow());
+ }
+ if (tScan.isSetTimestamp()) {
+ scan.setTimeRange(0, tScan.getTimestamp());
+ }
+ if (tScan.isSetCaching()) {
+ scan.setCaching(tScan.getCaching());
+ }
+ if (tScan.isSetBatchSize()) {
+ scan.setBatch(tScan.getBatchSize());
+ }
+ if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
+ for(ByteBuffer column : tScan.getColumns()) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ if (tScan.isSetFilterString()) {
+ ParseFilter parseFilter = new ParseFilter();
+ scan.setFilter(
+ parseFilter.parseFilterString(tScan.getFilterString()));
+ }
+ if (tScan.isSetReversed()) {
+ scan.setReversed(tScan.isReversed());
+ }
+ if (tScan.isSetCacheBlocks()) {
+ scan.setCacheBlocks(tScan.isCacheBlocks());
+ }
+ return addScanner(table.getScanner(scan), tScan.sortColumns);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
+ List columns,
+ Map attributes) throws IOError {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow));
+ addAttributes(scan, attributes);
+ if(columns != null && !columns.isEmpty()) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan), false);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
+ ByteBuffer stopRow, List columns,
+ Map attributes)
+ throws IOError, TException {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ addAttributes(scan, attributes);
+ if(columns != null && !columns.isEmpty()) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan), false);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public int scannerOpenWithPrefix(ByteBuffer tableName,
+ ByteBuffer startAndPrefix,
+ List columns,
+ Map attributes)
+ throws IOError, TException {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startAndPrefix));
+ addAttributes(scan, attributes);
+ Filter f = new WhileMatchFilter(
+ new PrefixFilter(getBytes(startAndPrefix)));
+ scan.setFilter(f);
+ if (columns != null && !columns.isEmpty()) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan), false);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
+ List columns, long timestamp,
+ Map attributes) throws IOError, TException {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow));
+ addAttributes(scan, attributes);
+ scan.setTimeRange(0, timestamp);
+ if (columns != null && !columns.isEmpty()) {
+ for (ByteBuffer column : columns) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan), false);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
+ ByteBuffer stopRow, List columns, long timestamp,
+ Map attributes)
+ throws IOError, TException {
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ addAttributes(scan, attributes);
+ scan.setTimeRange(0, timestamp);
+ if (columns != null && !columns.isEmpty()) {
+ for (ByteBuffer column : columns) {
+ byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ scan.setTimeRange(0, timestamp);
+ return addScanner(table.getScanner(scan), false);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public Map getColumnDescriptors(
+ ByteBuffer tableName) throws IOError, TException {
+
+ Table table = null;
+ try {
+ TreeMap columns = new TreeMap<>();
+
+ table = getTable(tableName);
+ HTableDescriptor desc = table.getTableDescriptor();
+
+ for (HColumnDescriptor e : desc.getFamilies()) {
+ ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
+ columns.put(col.name, col);
+ }
+ return columns;
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ private void closeTable(Table table) throws IOError {
+ try{
+ if(table != null){
+ table.close();
+ }
+ } catch (IOException e){
+ LOG.error(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ @Override
+ public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+ try {
+ byte[] row = getBytes(searchRow);
+ Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
+ HConstants.CATALOG_FAMILY);
+
+ if (startRowResult == null) {
+ throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
+ + Bytes.toStringBinary(row));
+ }
+
+ // find region start and end keys
+ RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
+ if (regionInfo == null) {
+ throw new IOException("RegionInfo REGIONINFO was null or " +
+ " empty in Meta for row="
+ + Bytes.toStringBinary(row));
+ }
+ TRegionInfo region = new TRegionInfo();
+ region.setStartKey(regionInfo.getStartKey());
+ region.setEndKey(regionInfo.getEndKey());
+ region.id = regionInfo.getRegionId();
+ region.setName(regionInfo.getRegionName());
+ region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
+
+ // find region assignment to server
+ ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
+ if (serverName != null) {
+ region.setServerName(Bytes.toBytes(serverName.getHostname()));
+ region.port = serverName.getPort();
+ }
+ return region;
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ }
+ }
+
+ private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
+ throws IOException {
+ Scan scan = new Scan(row);
+ scan.setReversed(true);
+ scan.addFamily(family);
+ scan.setStartRow(row);
+ try (Table table = getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ return scanner.next();
+ }
+ }
+
+ @Override
+ public void increment(TIncrement tincrement) throws IOError, TException {
+
+ if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
+ throw new TException("Must supply a table and a row key; can't increment");
+ }
+
+ if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+ this.coalescer.queueIncrement(tincrement);
+ return;
+ }
+
+ Table table = null;
+ try {
+ table = getTable(tincrement.getTable());
+ Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
+ table.increment(inc);
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public void incrementRows(List tincrements) throws IOError, TException {
+ if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+ this.coalescer.queueIncrements(tincrements);
+ return;
+ }
+ for (TIncrement tinc : tincrements) {
+ increment(tinc);
+ }
+ }
+
+ @Override
+ public List append(TAppend tappend) throws IOError, TException {
+ if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
+ throw new TException("Must supply a table and a row key; can't append");
+ }
+
+ Table table = null;
+ try {
+ table = getTable(tappend.getTable());
+ Append append = ThriftUtilities.appendFromThrift(tappend);
+ Result result = table.append(append);
+ return ThriftUtilities.cellFromHBase(result.rawCells());
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } finally{
+ closeTable(table);
+ }
+ }
+
+ @Override
+ public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+ ByteBuffer value, Mutation mput, Map attributes) throws IOError,
+ IllegalArgument, TException {
+ Put put;
+ try {
+ put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
+ addAttributes(put, attributes);
+
+ byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
+ put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(put.getRow())
+ .setFamily(famAndQf[0])
+ .setQualifier(famAndQf[1])
+ .setTimestamp(put.getTimestamp())
+ .setType(Cell.Type.Put)
+ .setValue(mput.value != null ? getBytes(mput.value)
+ : HConstants.EMPTY_BYTE_ARRAY)
+ .build());
+ put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+ } catch (IOException | IllegalArgumentException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ }
+
+ Table table = null;
+ try {
+ table = getTable(tableName);
+ byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
+ Table.CheckAndMutateBuilder mutateBuilder =
+ table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
+ if (value != null) {
+ return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
+ } else {
+ return mutateBuilder.ifNotExists().thenPut(put);
+ }
+ } catch (IOException e) {
+ LOG.warn(e.getMessage(), e);
+ throw getIOError(e);
+ } catch (IllegalArgumentException e) {
+ LOG.warn(e.getMessage(), e);
+ throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+ } finally {
+ closeTable(table);
+ }
+ }
+
+ private static IOError getIOError(Throwable throwable) {
+ IOError error = new IOErrorWithCause(throwable);
+ error.setMessage(Throwables.getStackTraceAsString(throwable));
+ return error;
+ }
+
+ /**
+ * Adds all the attributes into the Operation object
+ */
+ private static void addAttributes(OperationWithAttributes op,
+ Map attributes) {
+ if (attributes == null || attributes.isEmpty()) {
+ return;
+ }
+ for (Map.Entry entry : attributes.entrySet()) {
+ String name = Bytes.toStringBinary(getBytes(entry.getKey()));
+ byte[] value = getBytes(entry.getValue());
+ op.setAttribute(name, value);
+ }
+ }
+
+ protected static class ResultScannerWrapper {
+
+ private final ResultScanner scanner;
+ private final boolean sortColumns;
+ public ResultScannerWrapper(ResultScanner resultScanner,
+ boolean sortResultColumns) {
+ scanner = resultScanner;
+ sortColumns = sortResultColumns;
+ }
+
+ public ResultScanner getScanner() {
+ return scanner;
+ }
+
+ public boolean isColumnSorted() {
+ return sortColumns;
+ }
+ }
+
+ public static class IOErrorWithCause extends IOError {
+ private final Throwable cause;
+ public IOErrorWithCause(Throwable cause) {
+ this.cause = cause;
+ }
+
+ @Override
+ public synchronized Throwable getCause() {
+ return cause;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (super.equals(other) &&
+ other instanceof IOErrorWithCause) {
+ Throwable otherCause = ((IOErrorWithCause) other).getCause();
+ if (this.getCause() != null) {
+ return otherCause != null && this.getCause().equals(otherCause);
+ } else {
+ return otherCause == null;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (cause != null ? cause.hashCode() : 0);
+ return result;
+ }
+ }
+
+
+}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
index 4c9a35b2193..7f851525d9c 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
@@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.thrift;
-import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@@ -58,7 +58,7 @@ public class ThriftHttpServlet extends TServlet {
private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
private final transient UserGroupInformation serviceUGI;
private final transient UserGroupInformation httpUGI;
- private final transient ThriftServerRunner.HBaseHandler hbaseHandler;
+ private final transient HBaseServiceHandler handler;
private final boolean doAsEnabled;
private final boolean securityEnabled;
@@ -67,11 +67,11 @@ public class ThriftHttpServlet extends TServlet {
public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
UserGroupInformation serviceUGI, Configuration conf,
- ThriftServerRunner.HBaseHandler hbaseHandler, boolean securityEnabled, boolean doAsEnabled)
+ HBaseServiceHandler handler, boolean securityEnabled, boolean doAsEnabled)
throws IOException {
super(processor, protocolFactory);
this.serviceUGI = serviceUGI;
- this.hbaseHandler = hbaseHandler;
+ this.handler = handler;
this.securityEnabled = securityEnabled;
this.doAsEnabled = doAsEnabled;
@@ -146,7 +146,7 @@ public class ThriftHttpServlet extends TServlet {
}
effectiveUser = doAsUserFromQuery;
}
- hbaseHandler.setEffectiveUser(effectiveUser);
+ handler.setEffectiveUser(effectiveUser);
super.doPost(request, response);
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
index fc0032705ef..6d11ac6f267 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
@@ -18,16 +18,132 @@
package org.apache.hadoop.hbase.thrift;
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.http.HttpServerUtil;
import org.apache.hadoop.hbase.http.InfoServer;
-import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityUtil;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServlet;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
import org.apache.yetus.audience.InterfaceAudience;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
@@ -40,29 +156,36 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
* independent process.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class ThriftServer {
+public class ThriftServer extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
- private static final String MIN_WORKERS_OPTION = "minWorkers";
- private static final String MAX_WORKERS_OPTION = "workers";
- private static final String MAX_QUEUE_SIZE_OPTION = "queue";
- private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
- static final String BIND_OPTION = "bind";
- static final String COMPACT_OPTION = "compact";
- static final String FRAMED_OPTION = "framed";
- static final String PORT_OPTION = "port";
- static final String INFOPORT_OPTION = "infoport";
- private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
- private static final int DEFAULT_LISTEN_PORT = 9090;
- private Configuration conf;
- ThriftServerRunner serverRunner;
+ protected Configuration conf;
- private InfoServer infoServer;
+ protected InfoServer infoServer;
+
+ protected TProcessor processor;
+
+ protected ThriftMetrics metrics;
+ protected HBaseServiceHandler hbaseServiceHandler;
+ protected UserGroupInformation serviceUGI;
+ protected boolean httpEnabled;
+
+ protected SaslUtil.QualityOfProtection qop;
+ protected String host;
+ protected int listenPort;
+
+
+ protected boolean securityEnabled;
+ protected boolean doAsEnabled;
+
+ protected JvmPauseMonitor pauseMonitor;
+
+ protected volatile TServer tserver;
+ protected volatile Server httpServer;
- private static final String READ_TIMEOUT_OPTION = "readTimeout";
//
// Main program and support routines
@@ -72,7 +195,89 @@ public class ThriftServer {
this.conf = HBaseConfiguration.create(conf);
}
- private static void printUsageAndExit(Options options, int exitCode)
+ protected void setupParamters() throws IOException {
+ // login the server principal (if using secure Hadoop)
+ UserProvider userProvider = UserProvider.instantiate(conf);
+ securityEnabled = userProvider.isHadoopSecurityEnabled()
+ && userProvider.isHBaseSecurityEnabled();
+ if (securityEnabled) {
+ host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+ conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
+ conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
+ userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
+ }
+ this.serviceUGI = userProvider.getCurrent().getUGI();
+
+ this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+ this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
+ this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
+ this.hbaseServiceHandler = createHandler(conf, userProvider);
+ this.hbaseServiceHandler.initMetrics(metrics);
+ this.processor = createProcessor();
+
+ httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
+ doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
+ if (doAsEnabled && !httpEnabled) {
+ LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
+ }
+
+ String strQop = conf.get(THRIFT_QOP_KEY);
+ if (strQop != null) {
+ this.qop = SaslUtil.getQop(strQop);
+ }
+ if (qop != null) {
+ if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
+ qop != SaslUtil.QualityOfProtection.INTEGRITY &&
+ qop != SaslUtil.QualityOfProtection.PRIVACY) {
+ throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
+ THRIFT_QOP_KEY,
+ SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
+ SaslUtil.QualityOfProtection.INTEGRITY.name(),
+ SaslUtil.QualityOfProtection.PRIVACY.name()));
+ }
+ checkHttpSecurity(qop, conf);
+ if (!securityEnabled) {
+ throw new IOException("Thrift server must run in secure mode to support authentication");
+ }
+ }
+ registerFilters(conf);
+ pauseMonitor.start();
+ }
+
+ protected void startInfoServer() throws IOException {
+ // Put up info server.
+ int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
+
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf
+ .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
+ infoServer = new InfoServer("thrift", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
+ }
+
+ protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
+ if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
+ conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
+ !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+ throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
+ THRIFT_SSL_ENABLED_KEY + " is false");
+ }
+ }
+
+ protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
+ throws IOException {
+ return new ThriftHBaseServiceHandler(conf, userProvider);
+ }
+
+ protected TProcessor createProcessor() {
+ return new Hbase.Processor<>(
+ HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
+ }
+
+ protected void printUsageAndExit(Options options, int exitCode)
throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, options,
@@ -85,32 +290,333 @@ public class ThriftServer {
}
/**
- * Start up or shuts down the Thrift server, depending on the arguments.
- * @param args the arguments to pass in when starting the Thrift server
+ * Setup a HTTP Server using Jetty to serve calls from THttpClient
+ *
+ * @throws IOException IOException
*/
- void doMain(final String[] args) throws Exception {
- processOptions(args);
- serverRunner = new ThriftServerRunner(conf);
+ protected void setupHTTPServer() throws IOException {
+ TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
+ conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
- // Put up info server.
- int port = conf.getInt("hbase.thrift.info.port", 9095);
+ // Set the default max thread number to 100 to limit
+ // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
+ // Jetty set the default max thread number to 250, if we don't set it.
+ //
+ // Our default min thread number 2 is the same as that used by Jetty.
+ int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
+ conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
+ HTTP_MIN_THREADS_KEY_DEFAULT));
+ int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
+ conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ HTTP_MAX_THREADS_KEY_DEFAULT));
+ QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
+ threadPool.setMinThreads(minThreads);
+ httpServer = new Server(threadPool);
- if (port >= 0) {
- conf.setLong("startcode", System.currentTimeMillis());
- String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
- infoServer = new InfoServer("thrift", a, port, false, conf);
- infoServer.setAttribute("hbase.conf", conf);
- infoServer.start();
+ // Context handler
+ ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
+ ServletContextHandler.SESSIONS);
+ ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
+ HttpServerUtil.constrainHttpMethods(ctxHandler,
+ conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
+ THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
+
+ // set up Jetty and run the embedded server
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.setSecureScheme("https");
+ httpConfig.setSecurePort(listenPort);
+ httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setSendServerVersion(false);
+ httpConfig.setSendDateHeader(false);
+
+ ServerConnector serverConnector;
+ if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+ HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
+ httpsConfig.addCustomizer(new SecureRequestCustomizer());
+
+ SslContextFactory sslCtxFactory = new SslContextFactory();
+ String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
+ String password = HBaseConfiguration.getPassword(conf,
+ THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
+ String keyPassword = HBaseConfiguration.getPassword(conf,
+ THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
+ sslCtxFactory.setKeyStorePath(keystore);
+ sslCtxFactory.setKeyStorePassword(password);
+ sslCtxFactory.setKeyManagerPassword(keyPassword);
+
+ String[] excludeCiphers = conf.getStrings(
+ THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (excludeCiphers.length != 0) {
+ sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
+ }
+ String[] includeCiphers = conf.getStrings(
+ THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (includeCiphers.length != 0) {
+ sslCtxFactory.setIncludeCipherSuites(includeCiphers);
+ }
+
+ // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
+ String[] excludeProtocols = conf.getStrings(
+ THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
+ if (excludeProtocols.length != 0) {
+ sslCtxFactory.setExcludeProtocols(excludeProtocols);
+ }
+ String[] includeProtocols = conf.getStrings(
+ THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (includeProtocols.length != 0) {
+ sslCtxFactory.setIncludeProtocols(includeProtocols);
+ }
+
+ serverConnector = new ServerConnector(httpServer,
+ new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
+ new HttpConnectionFactory(httpsConfig));
+ } else {
+ serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
}
+ serverConnector.setPort(listenPort);
+ serverConnector.setHost(getBindAddress(conf).getHostAddress());
+ httpServer.addConnector(serverConnector);
+ httpServer.setStopAtShutdown(true);
- serverRunner.run();
+ if (doAsEnabled) {
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+ }
+ LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
}
/**
- * Parse the command line options to set parameters the conf.
+ * Setting up the thrift TServer
*/
- private void processOptions(final String[] args) throws Exception {
- Options options = new Options();
+ protected void setupServer() throws Exception {
+ // Construct correct ProtocolFactory
+ TProtocolFactory protocolFactory = getProtocolFactory();
+
+ ImplType implType = ImplType.getServerImpl(conf);
+ TProcessor processorToUse = processor;
+
+ // Construct correct TransportFactory
+ TTransportFactory transportFactory;
+ if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
+ if (qop != null) {
+ throw new RuntimeException("Thrift server authentication"
+ + " doesn't work with framed transport yet");
+ }
+ transportFactory = new TFramedTransport.Factory(
+ conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
+ LOG.debug("Using framed transport");
+ } else if (qop == null) {
+ transportFactory = new TTransportFactory();
+ } else {
+ // Extract the name from the principal
+ String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
+ if (thriftKerberosPrincipal == null) {
+ throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
+ }
+ String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
+ Map saslProperties = SaslUtil.initSaslProperties(qop.name());
+ TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+ saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
+ new SaslRpcServer.SaslGssCallbackHandler() {
+ @Override
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (!authid.equals(authzid)) {
+ ac.setAuthorized(false);
+ } else {
+ ac.setAuthorized(true);
+ String userName = SecurityUtil.getUserFromPrincipal(authzid);
+ LOG.info("Effective user: {}", userName);
+ ac.setAuthorizedID(userName);
+ }
+ }
+ }
+ });
+ transportFactory = saslFactory;
+
+ // Create a processor wrapper, to get the caller
+ processorToUse = (inProt, outProt) -> {
+ TSaslServerTransport saslServerTransport =
+ (TSaslServerTransport)inProt.getTransport();
+ SaslServer saslServer = saslServerTransport.getSaslServer();
+ String principal = saslServer.getAuthorizationID();
+ hbaseServiceHandler.setEffectiveUser(principal);
+ return processor.process(inProt, outProt);
+ };
+ }
+
+ if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+ LOG.error("Server types {} don't support IP address binding at the moment. See " +
+ "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
+ Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
+ throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
+ }
+
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
+ if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+ implType == ImplType.THREADED_SELECTOR) {
+ TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+ if (implType == ImplType.NONBLOCKING) {
+ tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
+ transportFactory, inetSocketAddress);
+ } else if (implType == ImplType.HS_HA) {
+ tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
+ inetSocketAddress);
+ } else { // THREADED_SELECTOR
+ tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
+ transportFactory, inetSocketAddress);
+ }
+ LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
+ Integer.toString(listenPort));
+ } else if (implType == ImplType.THREAD_POOL) {
+ this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
+ inetSocketAddress);
+ } else {
+ throw new AssertionError("Unsupported Thrift server implementation: " +
+ implType.simpleClassName());
+ }
+
+ // A sanity check that we instantiated the right type of server.
+ if (tserver.getClass() != implType.serverClass) {
+ throw new AssertionError("Expected to create Thrift server class " +
+ implType.serverClass.getName() + " but got " +
+ tserver.getClass().getName());
+ }
+ }
+
+ private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
+ serverArgs.processor(processor);
+ serverArgs.transportFactory(transportFactory);
+ serverArgs.protocolFactory(protocolFactory);
+ return new TNonblockingServer(serverArgs);
+ }
+
+ private TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
+ THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+ int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+ TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+ CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+ int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ serverArgs.getMaxWorkerThreads());
+ ExecutorService executorService = createExecutor(
+ callQueue, workerThread, workerThread);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
+ return new THsHaServer(serverArgs);
+ }
+
+ private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+ TThreadedSelectorServer.Args serverArgs =
+ new HThreadedSelectorServerArgs(serverTransport, conf);
+ int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+ TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+ CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+ int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ serverArgs.getWorkerThreads());
+ int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
+ serverArgs.selectorThreads(selectorThreads);
+ ExecutorService executorService = createExecutor(
+ callQueue, workerThreads, workerThreads);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
+ return new TThreadedSelectorServer(serverArgs);
+ }
+
+ private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
+ TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
+ LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
+ // Thrift's implementation uses '0' as a placeholder for 'use the default.'
+ int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
+ int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
+ THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
+ TServerTransport serverTransport = new TServerSocket(
+ new TServerSocket.ServerSocketTransportArgs().
+ bindAddr(inetSocketAddress).backlog(backlog).
+ clientTimeout(readTimeout));
+
+ TBoundedThreadPoolServer.Args serverArgs =
+ new TBoundedThreadPoolServer.Args(serverTransport, conf);
+ serverArgs.processor(processor).transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ return new TBoundedThreadPoolServer(serverArgs, metrics);
+ }
+
+ private TProtocolFactory getProtocolFactory() {
+ TProtocolFactory protocolFactory;
+
+ if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
+ LOG.debug("Using compact protocol");
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ LOG.debug("Using binary protocol");
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ return protocolFactory;
+ }
+
+ ExecutorService createExecutor(BlockingQueue callQueue,
+ int minWorkers, int maxWorkers) {
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ tfb.setDaemon(true);
+ tfb.setNameFormat("thrift-worker-%d");
+ ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
+ Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
+ threadPool.allowCoreThreadTimeOut(true);
+ return threadPool;
+ }
+
+ private InetAddress getBindAddress(Configuration conf)
+ throws UnknownHostException {
+ String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+ return InetAddress.getByName(bindAddressStr);
+ }
+
+
+ public static void registerFilters(Configuration conf) {
+ String[] filters = conf.getStrings(THRIFT_FILTERS);
+ Splitter splitter = Splitter.on(':');
+ if(filters != null) {
+ for(String filterClass: filters) {
+ List filterPart = splitter.splitToList(filterClass);
+ if(filterPart.size() != 2) {
+ LOG.warn("Invalid filter specification " + filterClass + " - skipping");
+ } else {
+ ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
+ }
+ }
+ }
+ }
+
+ /**
+ * Add options to command lines
+ * @param options options
+ */
+ protected void addOptions(Options options) {
options.addOption("b", BIND_OPTION, true, "Address to bind " +
"the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
@@ -118,62 +624,56 @@ public class ThriftServer {
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
+ options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
options.addOption("m", MIN_WORKERS_OPTION, true,
"The minimum number of worker threads for " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("w", MAX_WORKERS_OPTION, true,
"The maximum number of worker threads for " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
"The maximum number of queued requests in " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
"The amount of time in secods to keep a thread alive when idle in " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("t", READ_TIMEOUT_OPTION, true,
"Amount of time in milliseconds before a server thread will timeout " +
- "waiting for client to send data on a connected socket. Currently, " +
- "only applies to TBoundedThreadPoolServer");
+ "waiting for client to send data on a connected socket. Currently, " +
+ "only applies to TBoundedThreadPoolServer");
options.addOptionGroup(ImplType.createOptionGroup());
+ }
- CommandLineParser parser = new DefaultParser();
- CommandLine cmd = parser.parse(options, args);
-
- if (cmd.hasOption("help")) {
- printUsageAndExit(options, 1);
- }
-
+ protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
// Get port to bind to
try {
if (cmd.hasOption(PORT_OPTION)) {
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
- conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
+ conf.setInt(PORT_CONF_KEY, listenPort);
}
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1);
}
-
// check for user-defined info server port setting, if so override the conf
try {
if (cmd.hasOption(INFOPORT_OPTION)) {
String val = cmd.getOptionValue(INFOPORT_OPTION);
- conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
+ conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
LOG.debug("Web UI port set to " + val);
}
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
- " option", e);
+ " option", e);
printUsageAndExit(options, -1);
}
-
// Make optional changes to the configuration based on command-line options
optionToConf(cmd, MIN_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@@ -183,23 +683,42 @@ public class ThriftServer {
conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
- optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
- ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+ optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+ optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
// Set general thrift server options
boolean compact = cmd.hasOption(COMPACT_OPTION) ||
- conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
- conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact);
+ conf.getBoolean(COMPACT_CONF_KEY, false);
+ conf.setBoolean(COMPACT_CONF_KEY, compact);
boolean framed = cmd.hasOption(FRAMED_OPTION) ||
- conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
- conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
- if (cmd.hasOption(BIND_OPTION)) {
- conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
- }
+ conf.getBoolean(FRAMED_CONF_KEY, false);
+ conf.setBoolean(FRAMED_CONF_KEY, framed);
+
+ optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
+
ImplType.setServerImpl(cmd, conf);
}
+ /**
+ * Parse the command line options to set parameters the conf.
+ */
+ private void processOptions(final String[] args) throws Exception {
+ if (args == null || args.length == 0) {
+ return;
+ }
+ Options options = new Options();
+ addOptions(options);
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ printUsageAndExit(options, 1);
+ }
+ parseCommandLine(cmd, options);
+ }
+
public void stop() {
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
@@ -209,10 +728,25 @@ public class ThriftServer {
LOG.error("Failed to stop infoServer", ex);
}
}
- serverRunner.shutdown();
+ if (pauseMonitor != null) {
+ pauseMonitor.stop();
+ }
+ if (tserver != null) {
+ tserver.stop();
+ tserver = null;
+ }
+ if (httpServer != null) {
+ try {
+ httpServer.stop();
+ httpServer = null;
+ } catch (Exception e) {
+ LOG.error("Problem encountered in shutting down HTTP server", e);
+ }
+ httpServer = null;
+ }
}
- private static void optionToConf(CommandLine cmd, String option,
+ protected static void optionToConf(CommandLine cmd, String option,
Configuration conf, String destConfKey) {
if (cmd.hasOption(option)) {
String value = cmd.getOptionValue(option);
@@ -221,16 +755,38 @@ public class ThriftServer {
}
}
+ /**
+ * Run without any command line arguments
+ * @return exit code
+ * @throws Exception exception
+ */
+ public int run() throws Exception {
+ return run(null);
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ processOptions(strings);
+ setupParamters();
+ startInfoServer();
+ if (httpEnabled) {
+ setupHTTPServer();
+ httpServer.start();
+ httpServer.join();
+ } else {
+ setupServer();
+ tserver.serve();
+ }
+ return 0;
+ }
+
public static void main(String [] args) throws Exception {
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
VersionInfo.logVersion();
- int exitCode = 0;
- try {
- new ThriftServer(HBaseConfiguration.create()).doMain(args);
- } catch (ExitCodeException ex) {
- exitCode = ex.getExitCode();
- }
+ final Configuration conf = HBaseConfiguration.create();
+ // for now, only time we return is on an argument error.
+ final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
- System.exit(exitCode);
+ System.exit(status);
}
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
deleted file mode 100644
index 5e248f1fedc..00000000000
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ /dev/null
@@ -1,2031 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.thrift;
-
-import static org.apache.hadoop.hbase.util.Bytes.getBytes;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilder;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.http.HttpServerUtil;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
-import org.apache.hadoop.hbase.security.SecurityUtil;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
-import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
-import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
-import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.Mutation;
-import org.apache.hadoop.hbase.thrift.generated.TAppend;
-import org.apache.hadoop.hbase.thrift.generated.TCell;
-import org.apache.hadoop.hbase.thrift.generated.TIncrement;
-import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.generated.TScan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ConnectionCache;
-import org.apache.hadoop.hbase.util.DNS;
-import org.apache.hadoop.hbase.util.JvmPauseMonitor;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServlet;
-import org.apache.thrift.server.TThreadedSelectorServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TSaslServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.HttpConfiguration;
-import org.eclipse.jetty.server.HttpConnectionFactory;
-import org.eclipse.jetty.server.SecureRequestCustomizer;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.SslConnectionFactory;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
-import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
-
-/**
- * ThriftServerRunner - this class starts up a Thrift server which implements
- * the Hbase API specified in the Hbase.thrift IDL file.
- */
-@InterfaceAudience.Private
-public class ThriftServerRunner implements Runnable {
-
- private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class);
-
- private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
-
- static final String SERVER_TYPE_CONF_KEY =
- "hbase.regionserver.thrift.server.type";
-
- static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
- static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
- static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
- static final String MAX_FRAME_SIZE_CONF_KEY =
- "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
- static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
- static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
- static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
- static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
- static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
-
- static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
- static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
- static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = "hbase.thrift.ssl.keystore.password";
- static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY = "hbase.thrift.ssl.keystore.keypassword";
- static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
- "hbase.thrift.ssl.exclude.cipher.suites";
- static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
- "hbase.thrift.ssl.include.cipher.suites";
- static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.exclude.protocols";
- static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.include.protocols";
-
- static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
-
- static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
- static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
- static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
- static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
- static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
- static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
-
- /**
- * Amount of time in milliseconds before a server thread will timeout
- * waiting for client to send data on a connected socket. Currently,
- * applies only to TBoundedThreadPoolServer
- */
- public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
- "hbase.thrift.server.socket.read.timeout";
- public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
-
-
- /**
- * Thrift quality of protection configuration key. Valid values can be:
- * auth-conf: authentication, integrity and confidentiality checking
- * auth-int: authentication and integrity checking
- * auth: authentication only
- *
- * This is used to authenticate the callers and support impersonation.
- * The thrift server and the HBase cluster must run in secure mode.
- */
- static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
- static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
-
- private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
- public static final int DEFAULT_LISTEN_PORT = 9090;
- public static final int HREGION_VERSION = 1;
-
- private final int listenPort;
-
- private Configuration conf;
- volatile TServer tserver;
- volatile Server httpServer;
- private final Hbase.Iface handler;
- private final ThriftMetrics metrics;
- private final HBaseHandler hbaseHandler;
- private final UserGroupInformation serviceUGI;
-
- private SaslUtil.QualityOfProtection qop;
- private String host;
-
- private final boolean securityEnabled;
- private final boolean doAsEnabled;
-
- private final JvmPauseMonitor pauseMonitor;
-
- static String THRIFT_HTTP_ALLOW_OPTIONS_METHOD = "hbase.thrift.http.allow.options.method";
- private static boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
-
- /** An enum of server implementation selections */
- public enum ImplType {
- HS_HA("hsha", true, THsHaServer.class, true),
- NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
- THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
- THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
-
- public static final ImplType DEFAULT = THREAD_POOL;
-
- final String option;
- final boolean isAlwaysFramed;
- final Class extends TServer> serverClass;
- final boolean canSpecifyBindIP;
-
- private ImplType(String option, boolean isAlwaysFramed,
- Class extends TServer> serverClass, boolean canSpecifyBindIP) {
- this.option = option;
- this.isAlwaysFramed = isAlwaysFramed;
- this.serverClass = serverClass;
- this.canSpecifyBindIP = canSpecifyBindIP;
- }
-
- /**
- * @return -option
- */
- @Override
- public String toString() {
- return "-" + option;
- }
-
- public String getOption() {
- return option;
- }
-
- public boolean isAlwaysFramed() {
- return isAlwaysFramed;
- }
-
- public String getDescription() {
- StringBuilder sb = new StringBuilder("Use the " +
- serverClass.getSimpleName());
- if (isAlwaysFramed) {
- sb.append(" This implies the framed transport.");
- }
- if (this == DEFAULT) {
- sb.append("This is the default.");
- }
- return sb.toString();
- }
-
- static OptionGroup createOptionGroup() {
- OptionGroup group = new OptionGroup();
- for (ImplType t : values()) {
- group.addOption(new Option(t.option, t.getDescription()));
- }
- return group;
- }
-
- public static ImplType getServerImpl(Configuration conf) {
- String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
- for (ImplType t : values()) {
- if (confType.equals(t.option)) {
- return t;
- }
- }
- throw new AssertionError("Unknown server ImplType.option:" + confType);
- }
-
- static void setServerImpl(CommandLine cmd, Configuration conf) {
- ImplType chosenType = null;
- int numChosen = 0;
- for (ImplType t : values()) {
- if (cmd.hasOption(t.option)) {
- chosenType = t;
- ++numChosen;
- }
- }
- if (numChosen < 1) {
- LOG.info("Using default thrift server type");
- chosenType = DEFAULT;
- } else if (numChosen > 1) {
- throw new AssertionError("Exactly one option out of " +
- Arrays.toString(values()) + " has to be specified");
- }
- LOG.info("Using thrift server type " + chosenType.option);
- conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
- }
-
- public String simpleClassName() {
- return serverClass.getSimpleName();
- }
-
- public static List serversThatCannotSpecifyBindIP() {
- List l = new ArrayList<>();
- for (ImplType t : values()) {
- if (!t.canSpecifyBindIP) {
- l.add(t.simpleClassName());
- }
- }
- return l;
- }
- }
-
- public ThriftServerRunner(Configuration conf) throws IOException {
- // login the server principal (if using secure Hadoop)
- UserProvider userProvider = UserProvider.instantiate(conf);
- securityEnabled = userProvider.isHadoopSecurityEnabled()
- && userProvider.isHBaseSecurityEnabled();
- if (securityEnabled) {
- host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
- conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
- conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
- userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
- }
- this.serviceUGI = userProvider.getCurrent().getUGI();
-
- this.conf = HBaseConfiguration.create(conf);
- this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
- this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
- this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
- this.hbaseHandler = new HBaseHandler(conf, userProvider);
- this.hbaseHandler.initMetrics(metrics);
- this.handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
-
- boolean httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
- doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
- if (doAsEnabled && !httpEnabled) {
- LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
- }
-
- String strQop = conf.get(THRIFT_QOP_KEY);
- if (strQop != null) {
- this.qop = SaslUtil.getQop(strQop);
- }
- if (qop != null) {
- if (qop != QualityOfProtection.AUTHENTICATION &&
- qop != QualityOfProtection.INTEGRITY &&
- qop != QualityOfProtection.PRIVACY) {
- throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
- THRIFT_QOP_KEY,
- QualityOfProtection.AUTHENTICATION.name(),
- QualityOfProtection.INTEGRITY.name(),
- QualityOfProtection.PRIVACY.name()));
- }
- checkHttpSecurity(qop, conf);
- if (!securityEnabled) {
- throw new IOException("Thrift server must run in secure mode to support authentication");
- }
- }
- }
-
- private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
- if (qop == QualityOfProtection.PRIVACY &&
- conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
- !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
- throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
- THRIFT_SSL_ENABLED_KEY + " is false");
- }
- }
-
- /*
- * Runs the Thrift server
- */
- @Override
- public void run() {
- serviceUGI.doAs(new PrivilegedAction