HBASE-11349 [Thrift] support authentication/impersonation

This commit is contained in:
Jimmy Xiang 2014-06-18 09:45:06 -07:00
parent b47a3f6924
commit 79a03c8bcb
10 changed files with 455 additions and 236 deletions

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.util.HttpServerUtil;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
@ -108,8 +107,7 @@ public class RESTServer implements Constants {
}
}
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
RESTServlet servlet = RESTServlet.getInstance(conf, realUser);
RESTServlet servlet = RESTServlet.getInstance(conf, userProvider);
Options options = new Options();
options.addOption("p", "port", true, "Port to bind to [default: 8080]");

View File

@ -19,24 +19,14 @@
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
@ -49,98 +39,16 @@ public class RESTServlet implements Constants {
private static RESTServlet INSTANCE;
private final Configuration conf;
private final MetricsREST metrics = new MetricsREST();
private final Map<String, ConnectionInfo>
connections = new ConcurrentHashMap<String, ConnectionInfo>();
private final KeyLocker<String> locker = new KeyLocker<String>();
private final ConnectionCache connectionCache;
private final UserGroupInformation realUser;
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
private final ThreadLocal<UserGroupInformation> effectiveUser =
new ThreadLocal<UserGroupInformation>() {
protected UserGroupInformation initialValue() {
return realUser;
}
};
UserGroupInformation getRealUser() {
return realUser;
}
// A chore to clean up idle connections.
private final Chore connectionCleaner;
private final Stoppable stoppable;
private UserProvider userProvider;
class ConnectionInfo {
final HConnection connection;
final String userName;
volatile HBaseAdmin admin;
private long lastAccessTime;
private boolean closed;
ConnectionInfo(HConnection conn, String user) {
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
connection = conn;
closed = false;
userName = user;
}
synchronized boolean updateAccessTime() {
if (closed) {
return false;
}
if (connection.isAborted() || connection.isClosed()) {
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
connections.remove(userName);
return false;
}
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
return true;
}
synchronized boolean timedOut(int maxIdleTime) {
long timeoutTime = lastAccessTime + maxIdleTime;
if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
connections.remove(userName);
closed = true;
}
return false;
}
}
class ConnectionCleaner extends Chore {
private final int maxIdleTime;
public ConnectionCleaner(int cleanInterval, int maxIdleTime) {
super("REST-ConnectionCleaner", cleanInterval, stoppable);
this.maxIdleTime = maxIdleTime;
}
@Override
protected void chore() {
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
ConnectionInfo connInfo = entry.getValue();
if (connInfo.timedOut(maxIdleTime)) {
if (connInfo.admin != null) {
try {
connInfo.admin.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle admin", t);
}
}
try {
connInfo.connection.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle connection", t);
}
}
}
}
}
/**
* @return the RESTServlet singleton instance
*/
@ -153,11 +61,12 @@ public class RESTServlet implements Constants {
* @param conf Existing configuration to use in rest servlet
* @param realUser the login user
* @return the RESTServlet singleton instance
* @throws IOException
*/
public synchronized static RESTServlet getInstance(Configuration conf,
UserGroupInformation realUser) {
UserProvider userProvider) throws IOException {
if (INSTANCE == null) {
INSTANCE = new RESTServlet(conf, realUser);
INSTANCE = new RESTServlet(conf, userProvider);
}
return INSTANCE;
}
@ -169,52 +78,30 @@ public class RESTServlet implements Constants {
/**
* Constructor with existing configuration
* @param conf existing configuration
* @param realUser the login user
* @param userProvider the login user provider
* @throws IOException
*/
RESTServlet(final Configuration conf,
final UserGroupInformation realUser) {
this.userProvider = UserProvider.instantiate(conf);
stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};
final UserProvider userProvider) throws IOException {
this.realUser = userProvider.getCurrent().getUGI();
this.conf = conf;
registerCustomFilter(conf);
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime);
Threads.setDaemonThreadRunning(connectionCleaner.getThread());
this.realUser = realUser;
this.conf = conf;
registerCustomFilter(conf);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
}
/**
* Caller doesn't close the admin afterwards.
* We need to manage it and close it properly.
*/
HBaseAdmin getAdmin() throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
if (connInfo.admin == null) {
Lock lock = locker.acquireLock(effectiveUser.get().getUserName());
try {
if (connInfo.admin == null) {
connInfo.admin = new HBaseAdmin(connInfo.connection);
}
} finally {
lock.unlock();
}
}
return connInfo.admin;
return connectionCache.getAdmin();
}
/**
* Caller closes the table afterwards.
*/
HTableInterface getTable(String tableName) throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
return connInfo.connection.getTable(tableName);
return connectionCache.getTable(tableName);
}
Configuration getConfiguration() {
@ -234,30 +121,10 @@ public class RESTServlet implements Constants {
return getConfiguration().getBoolean("hbase.rest.readonly", false);
}
void setEffectiveUser(UserGroupInformation effectiveUser) {
this.effectiveUser.set(effectiveUser);
void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}
private ConnectionInfo getCurrentConnection() throws IOException {
String userName = effectiveUser.get().getUserName();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo == null || !connInfo.updateAccessTime()) {
Lock lock = locker.acquireLock(userName);
try {
connInfo = connections.get(userName);
if (connInfo == null) {
User user = userProvider.create(effectiveUser.get());
HConnection conn = HConnectionManager.createConnection(conf, user);
connInfo = new ConnectionInfo(conn, userName);
connections.put(userName, connInfo);
}
} finally {
lock.unlock();
}
}
return connInfo;
}
private void registerCustomFilter(Configuration conf) {
String[] filterList = conf.getStrings(Constants.CUSTOM_FILTERS);
if (filterList != null) {

View File

@ -50,28 +50,27 @@ public class RESTServletContainer extends ServletContainer {
public void service(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
final String doAsUserFromQuery = request.getParameter("doAs");
Configuration conf = RESTServlet.getInstance().getConfiguration();
final boolean proxyConfigured = conf.getBoolean("hbase.rest.support.proxyuser", false);
if (doAsUserFromQuery != null && !proxyConfigured) {
throw new ServletException("Support for proxyuser is not configured");
}
UserGroupInformation ugi = RESTServlet.getInstance().getRealUser();
RESTServlet servlet = RESTServlet.getInstance();
if (doAsUserFromQuery != null) {
Configuration conf = servlet.getConfiguration();
if (!conf.getBoolean("hbase.rest.support.proxyuser", false)) {
throw new ServletException("Support for proxyuser is not configured");
}
UserGroupInformation ugi = servlet.getRealUser();
// create and attempt to authorize a proxy user (the client is attempting
// to do proxy user)
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
// validate the proxy user authorization
try {
ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
} catch(AuthorizationException e) {
throw new ServletException(e.getMessage());
}
servlet.setEffectiveUser(doAsUserFromQuery);
} else {
// the REST server would send the request without validating the proxy
// user authorization
ugi = UserGroupInformation.createProxyUser(request.getRemoteUser(), ugi);
String effectiveUser = request.getRemoteUser();
servlet.setEffectiveUser(effectiveUser);
}
RESTServlet.getInstance().setEffectiveUser(ugi);
super.service(request, response);
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.security;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Security related generic utility methods.
*/
@InterfaceAudience.Private
public class SecurityUtil {
/**
* Get the user name from a principal
*/
public static String getUserFromPrincipal(final String principal) {
int i = principal.indexOf("/");
if (i == -1) {
i = principal.indexOf("@");
}
return (i > -1) ? principal.substring(0, i) : principal;
}
}

View File

@ -0,0 +1,203 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
/**
* A utility to store user specific HConnections in memory.
* There is a chore to clean up connections idle for too long.
* This class is used by REST server and Thrift server to
* support authentication and impersonation.
*/
@InterfaceAudience.Private
public class ConnectionCache {
private static Logger LOG = Logger.getLogger(ConnectionCache.class);
private final Map<String, ConnectionInfo>
connections = new ConcurrentHashMap<String, ConnectionInfo>();
private final KeyLocker<String> locker = new KeyLocker<String>();
private final String realUserName;
private final UserGroupInformation realUser;
private final UserProvider userProvider;
private final Configuration conf;
private final ThreadLocal<String> effectiveUserNames =
new ThreadLocal<String>() {
protected String initialValue() {
return realUserName;
}
};
public ConnectionCache(final Configuration conf,
final UserProvider userProvider,
final int cleanInterval, final int maxIdleTime) throws IOException {
Stoppable stoppable = new Stoppable() {
private volatile boolean isStopped = false;
@Override public void stop(String why) { isStopped = true;}
@Override public boolean isStopped() {return isStopped;}
};
Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) {
@Override
protected void chore() {
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
ConnectionInfo connInfo = entry.getValue();
if (connInfo.timedOut(maxIdleTime)) {
if (connInfo.admin != null) {
try {
connInfo.admin.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle admin", t);
}
}
try {
connInfo.connection.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle connection", t);
}
}
}
}
};
// Start the daemon cleaner chore
Threads.setDaemonThreadRunning(cleaner.getThread());
this.realUser = userProvider.getCurrent().getUGI();
this.realUserName = realUser.getShortUserName();
this.userProvider = userProvider;
this.conf = conf;
}
/**
* Set the current thread local effective user
*/
public void setEffectiveUser(String user) {
effectiveUserNames.set(user);
}
/**
* Caller doesn't close the admin afterwards.
* We need to manage it and close it properly.
*/
@SuppressWarnings("deprecation")
public HBaseAdmin getAdmin() throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
if (connInfo.admin == null) {
Lock lock = locker.acquireLock(effectiveUserNames.get());
try {
if (connInfo.admin == null) {
connInfo.admin = new HBaseAdmin(connInfo.connection);
}
} finally {
lock.unlock();
}
}
return connInfo.admin;
}
/**
* Caller closes the table afterwards.
*/
public HTableInterface getTable(String tableName) throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
return connInfo.connection.getTable(tableName);
}
/**
* Get the cached connection for the current user.
* If none or timed out, create a new one.
*/
ConnectionInfo getCurrentConnection() throws IOException {
String userName = effectiveUserNames.get();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo == null || !connInfo.updateAccessTime()) {
Lock lock = locker.acquireLock(userName);
try {
connInfo = connections.get(userName);
if (connInfo == null) {
UserGroupInformation ugi = realUser;
if (!userName.equals(realUserName)) {
ugi = UserGroupInformation.createProxyUser(userName, realUser);
}
User user = userProvider.create(ugi);
HConnection conn = HConnectionManager.createConnection(conf, user);
connInfo = new ConnectionInfo(conn, userName);
connections.put(userName, connInfo);
}
} finally {
lock.unlock();
}
}
return connInfo;
}
class ConnectionInfo {
final HConnection connection;
final String userName;
volatile HBaseAdmin admin;
private long lastAccessTime;
private boolean closed;
ConnectionInfo(HConnection conn, String user) {
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
connection = conn;
closed = false;
userName = user;
}
synchronized boolean updateAccessTime() {
if (closed) {
return false;
}
if (connection.isAborted() || connection.isClosed()) {
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
connections.remove(userName);
return false;
}
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
return true;
}
synchronized boolean timedOut(int maxIdleTime) {
long timeoutTime = lastAccessTime + maxIdleTime;
if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
connections.remove(userName);
closed = true;
}
return false;
}
}
}

View File

@ -22,7 +22,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.HttpServerUtil;
import org.apache.hadoop.util.StringUtils;
import org.mortbay.jetty.Server;
@ -49,7 +49,7 @@ public class HBaseRESTTestingUtility {
}
// Inject the conf for the test by being first to make singleton
RESTServlet.getInstance(conf, User.getCurrent().getUGI());
RESTServlet.getInstance(conf, UserProvider.instantiate(conf));
// set up the Jersey servlet container for Jetty
ServletHolder sh = new ServletHolder(ServletContainer.class);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
@ -421,18 +422,18 @@ public class TestGetAndPutResource extends RowResourceBase {
response = deleteRow(TABLE, ROW_4);
assertEquals(response.getCode(), 200);
UserGroupInformation ugi = User.getCurrent().getUGI();
UserProvider userProvider = UserProvider.instantiate(conf);
METRICS_ASSERT.assertCounterGt("requests", 2l,
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulGet", 0l,
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulPut", 0l,
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulDelete", 0l,
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
}
@Test

View File

@ -32,11 +32,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.Shell.ExitCodeException;
/**
@ -92,17 +89,6 @@ public class ThriftServer {
void doMain(final String[] args) throws Exception {
processOptions(args);
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) {
String machineName =
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
userProvider
.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", machineName);
}
serverRunner = new ThriftServerRunner(conf);
// Put up info server.
@ -168,9 +154,10 @@ public class ThriftServer {
// Get port to bind to
try {
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
String.valueOf(DEFAULT_LISTEN_PORT)));
conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
if (cmd.hasOption(PORT_OPTION)) {
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
}
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1);

View File

@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -38,6 +39,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslServer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
@ -70,6 +77,8 @@ import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.security.SecurityUtil;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
@ -85,9 +94,16 @@ import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.thrift.generated.TScan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
@ -96,6 +112,7 @@ import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
@ -108,6 +125,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* the Hbase API specified in the Hbase.thrift IDL file.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class ThriftServerRunner implements Runnable {
private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
@ -122,6 +140,17 @@ public class ThriftServerRunner implements Runnable {
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
/**
* Thrift quality of protection configuration key. Valid values can be:
* auth-conf: authentication, integrity and confidentiality checking
* auth-int: authentication and integrity checking
* auth: authentication only
*
* This is used to authenticate the callers and support impersonation.
* The thrift server and the HBase cluster must run in secure mode.
*/
static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
public static final int DEFAULT_LISTEN_PORT = 9090;
private final int listenPort;
@ -130,6 +159,11 @@ public class ThriftServerRunner implements Runnable {
volatile TServer tserver;
private final Hbase.Iface handler;
private final ThriftMetrics metrics;
private final HBaseHandler hbaseHandler;
private final UserGroupInformation realUser;
private final String qop;
private String host;
/** An enum of server implementation selections */
enum ImplType {
@ -230,15 +264,37 @@ public class ThriftServerRunner implements Runnable {
}
public ThriftServerRunner(Configuration conf) throws IOException {
this(conf, new ThriftServerRunner.HBaseHandler(conf));
}
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
&& userProvider.isHBaseSecurityEnabled();
if (securityEnabled) {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
userProvider.login("hbase.thrift.keytab.file",
"hbase.thrift.kerberos.principal", host);
}
this.conf = HBaseConfiguration.create(conf);
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
handler.initMetrics(metrics);
this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
this.hbaseHandler = new HBaseHandler(conf, userProvider);
this.hbaseHandler.initMetrics(metrics);
this.handler = HbaseHandlerMetricsProxy.newInstance(
hbaseHandler, metrics, conf);
this.realUser = userProvider.getCurrent().getUGI();
qop = conf.get(THRIFT_QOP_KEY);
if (qop != null) {
if (!qop.equals("auth") && !qop.equals("auth-int")
&& !qop.equals("auth-conf")) {
throw new IOException("Invalid hbase.thrift.security.qop: " + qop
+ ", it must be 'auth', 'auth-int', or 'auth-conf'");
}
if (!securityEnabled) {
throw new IOException("Thrift server must"
+ " run in secure mode to support authentication");
}
}
}
/*
@ -246,14 +302,21 @@ public class ThriftServerRunner implements Runnable {
*/
@Override
public void run() {
try {
setupServer();
tserver.serve();
} catch (Exception e) {
LOG.fatal("Cannot run ThriftServer", e);
// Crash the process if the ThriftServer is not running
System.exit(-1);
}
realUser.doAs(
new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
setupServer();
tserver.serve();
} catch (Exception e) {
LOG.fatal("Cannot run ThriftServer", e);
// Crash the process if the ThriftServer is not running
System.exit(-1);
}
return null;
}
});
}
public void shutdown() {
@ -277,18 +340,72 @@ public class ThriftServerRunner implements Runnable {
protocolFactory = new TBinaryProtocol.Factory();
}
Hbase.Processor<Hbase.Iface> processor =
new Hbase.Processor<Hbase.Iface>(handler);
final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
ImplType implType = ImplType.getServerImpl(conf);
TProcessor processor = p;
// Construct correct TransportFactory
TTransportFactory transportFactory;
if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
if (qop != null) {
throw new RuntimeException("Thrift server authentication"
+ " doesn't work with framed transport yet");
}
transportFactory = new TFramedTransport.Factory(
conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
LOG.debug("Using framed transport");
} else {
} else if (qop == null) {
transportFactory = new TTransportFactory();
} else {
// Extract the name from the principal
String name = SecurityUtil.getUserFromPrincipal(
conf.get("hbase.thrift.kerberos.principal"));
Map<String, String> saslProperties = new HashMap<String, String>();
saslProperties.put(Sasl.QOP, qop);
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
new SaslGssCallbackHandler() {
@Override
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL GSSAPI Callback");
}
}
if (ac != null) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (!authid.equals(authzid)) {
ac.setAuthorized(false);
} else {
ac.setAuthorized(true);
String userName = SecurityUtil.getUserFromPrincipal(authzid);
LOG.info("Effective user: " + userName);
ac.setAuthorizedID(userName);
}
}
}
});
transportFactory = saslFactory;
// Create a processor wrapper, to get the caller
processor = new TProcessor() {
@Override
public boolean process(TProtocol inProt,
TProtocol outProt) throws TException {
TSaslServerTransport saslServerTransport =
(TSaslServerTransport)inProt.getTransport();
SaslServer saslServer = saslServerTransport.getSaslServer();
String principal = saslServer.getAuthorizationID();
hbaseHandler.setEffectiveUser(principal);
return p.process(inProt, outProt);
}
};
}
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
@ -415,7 +532,6 @@ public class ThriftServerRunner implements Runnable {
*/
public static class HBaseHandler implements Hbase.Iface {
protected Configuration conf;
protected volatile HBaseAdmin admin = null;
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
// nextScannerId and scannerMap are used to manage scanner state
@ -423,6 +539,8 @@ public class ThriftServerRunner implements Runnable {
protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
private ThriftMetrics metrics = null;
private final ConnectionCache connectionCache;
private static ThreadLocal<Map<String, HTable>> threadLocalTables =
new ThreadLocal<Map<String, HTable>>() {
@Override
@ -433,6 +551,9 @@ public class ThriftServerRunner implements Runnable {
IncrementCoalescer coalescer = null;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
/**
* Returns a list of all the column families for a given htable.
*
@ -460,10 +581,10 @@ public class ThriftServerRunner implements Runnable {
*/
public HTable getTable(final byte[] tableName) throws
IOException {
String table = new String(tableName);
String table = Bytes.toString(tableName);
Map<String, HTable> tables = threadLocalTables.get();
if (!tables.containsKey(table)) {
tables.put(table, new HTable(conf, tableName));
tables.put(table, (HTable)connectionCache.getTable(table));
}
return tables.get(table);
}
@ -507,33 +628,27 @@ public class ThriftServerRunner implements Runnable {
return scannerMap.remove(id);
}
/**
* Constructs an HBaseHandler object.
* @throws IOException
*/
protected HBaseHandler()
throws IOException {
this(HBaseConfiguration.create());
}
protected HBaseHandler(final Configuration c) throws IOException {
protected HBaseHandler(final Configuration c,
final UserProvider userProvider) throws IOException {
this.conf = c;
scannerMap = new HashMap<Integer, ResultScannerWrapper>();
this.coalescer = new IncrementCoalescer(this);
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
}
/**
* Obtain HBaseAdmin. Creates the instance if it is not already created.
*/
private HBaseAdmin getHBaseAdmin() throws IOException {
if (admin == null) {
synchronized (this) {
if (admin == null) {
admin = new HBaseAdmin(conf);
}
}
}
return admin;
return connectionCache.getAdmin();
}
void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}
@Override

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
@ -137,7 +138,8 @@ public class TestThriftServer {
*/
public void doTestTableCreateDrop() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableCreateDrop(handler);
}
@ -151,7 +153,7 @@ public class TestThriftServer {
protected MySlowHBaseHandler(Configuration c)
throws IOException {
super(c);
super(c, UserProvider.instantiate(c));
}
@Override
@ -256,7 +258,8 @@ public class TestThriftServer {
public void doTestIncrements() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
createTestTables(handler);
doTestIncrements(handler);
dropTestTables(handler);
@ -303,7 +306,8 @@ public class TestThriftServer {
*/
public void doTestTableMutations() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestTableMutations(handler);
}
@ -381,7 +385,8 @@ public class TestThriftServer {
public void doTestTableTimestampsAndColumns() throws Exception {
// Setup
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
// Apply timestamped Mutations to rowA
@ -460,7 +465,8 @@ public class TestThriftServer {
public void doTestTableScanners() throws Exception {
// Setup
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
// Apply timestamped Mutations to rowA
@ -579,7 +585,8 @@ public class TestThriftServer {
*/
public void doTestGetTableRegions() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestGetTableRegions(handler);
}
@ -614,7 +621,8 @@ public class TestThriftServer {
public void doTestGetRegionInfo() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
doTestGetRegionInfo(handler);
}
@ -642,7 +650,8 @@ public class TestThriftServer {
*/
public static void doTestAppend() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<Mutation>(1);
@ -675,7 +684,8 @@ public class TestThriftServer {
*/
public static void doTestCheckAndPut() throws Exception {
ThriftServerRunner.HBaseHandler handler =
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
UserProvider.instantiate(UTIL.getConfiguration()));
handler.createTable(tableAname, getColumnDescriptors());
try {
List<Mutation> mutations = new ArrayList<Mutation>(1);