HBASE-11474 [Thrift2] support authentication/impersonation

This commit is contained in:
Jimmy Xiang 2014-07-08 10:29:20 -07:00
parent 213e565bce
commit fc57363404
6 changed files with 229 additions and 27 deletions

View File

@ -107,6 +107,13 @@ public class ConnectionCache {
effectiveUserNames.set(user);
}
/**
* Get the current thread local effective user
*/
public String getEffectiveUser() {
return effectiveUserNames.get();
}
/**
* Caller doesn't close the admin afterwards.
* We need to manage it and close it properly.
@ -115,7 +122,7 @@ public class ConnectionCache {
public HBaseAdmin getAdmin() throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
if (connInfo.admin == null) {
Lock lock = locker.acquireLock(effectiveUserNames.get());
Lock lock = locker.acquireLock(getEffectiveUser());
try {
if (connInfo.admin == null) {
connInfo.admin = new HBaseAdmin(connInfo.connection);
@ -140,7 +147,7 @@ public class ConnectionCache {
* If none or timed out, create a new one.
*/
ConnectionInfo getCurrentConnection() throws IOException {
String userName = effectiveUserNames.get();
String userName = getEffectiveUser();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo == null || !connInfo.updateAccessTime()) {
Lock lock = locker.acquireLock(userName);

View File

@ -287,7 +287,7 @@ public class ThriftServerRunner implements Runnable {
if (qop != null) {
if (!qop.equals("auth") && !qop.equals("auth-int")
&& !qop.equals("auth-conf")) {
throw new IOException("Invalid hbase.thrift.security.qop: " + qop
throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
+ ", it must be 'auth', 'auth-int', or 'auth-conf'");
}
if (!securityEnabled) {

View File

@ -30,30 +30,41 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableFactory;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConnectionCache;
import org.apache.thrift.TException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
* defined in the HTableInterface.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class ThriftHBaseServiceHandler implements THBaseService.Iface {
// TODO: Size of pool configuraple
private final HTablePool htablePool;
private final Cache<String, HTablePool> htablePools;
private final Callable<? extends HTablePool> htablePoolCreater;
private static final Log LOG = LogFactory.getLog(ThriftHBaseServiceHandler.class);
// nextScannerId and scannerMap are used to manage scanner state
@ -62,8 +73,15 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
private final Map<Integer, ResultScanner> scannerMap =
new ConcurrentHashMap<Integer, ResultScanner>();
public static THBaseService.Iface newInstance(Configuration conf, ThriftMetrics metrics) {
THBaseService.Iface handler = new ThriftHBaseServiceHandler(conf);
private final ConnectionCache connectionCache;
private final HTableFactory tableFactory;
private final int maxPoolSize;
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
public static THBaseService.Iface newInstance(
THBaseService.Iface handler, ThriftMetrics metrics) {
return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
}
@ -98,13 +116,41 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
return System.nanoTime();
}
ThriftHBaseServiceHandler(Configuration conf) {
int maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
htablePool = new HTablePool(conf, maxPoolSize);
ThriftHBaseServiceHandler(final Configuration conf,
final UserProvider userProvider) throws IOException {
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCache = new ConnectionCache(
conf, userProvider, cleanInterval, maxIdleTime);
tableFactory = new HTableFactory() {
@Override
public HTableInterface createHTableInterface(Configuration config,
byte[] tableName) {
try {
return connectionCache.getTable(Bytes.toString(tableName));
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
};
htablePools = CacheBuilder.newBuilder().expireAfterAccess(
maxIdleTime, TimeUnit.MILLISECONDS).softValues().concurrencyLevel(4).build();
maxPoolSize = conf.getInt("hbase.thrift.htablepool.size.max", 1000);
htablePoolCreater = new Callable<HTablePool>() {
public HTablePool call() {
return new HTablePool(conf, maxPoolSize, tableFactory);
}
};
}
private HTableInterface getTable(ByteBuffer tableName) {
return htablePool.getTable(byteBufferToByteArray(tableName));
String currentUser = connectionCache.getEffectiveUser();
try {
HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
return htablePool.getTable(byteBufferToByteArray(tableName));
} catch (ExecutionException ee) {
throw new RuntimeException(ee);
}
}
private void closeTable(HTableInterface table) throws TIOError {
@ -141,6 +187,10 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
return scannerMap.get(id);
}
void setEffectiveUser(String effectiveUser) {
connectionCache.setEffectiveUser(effectiveUser);
}
/**
* Removes the scanner associated with the specified ID from the internal HashMap.
* @param id of the Scanner to remove

View File

@ -22,12 +22,21 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslServer;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@ -43,13 +52,22 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.security.SecurityUtil;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.thrift.CallQueue;
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
@ -58,6 +76,7 @@ import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
@ -74,8 +93,20 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ThriftServer {
private static final Log log = LogFactory.getLog(ThriftServer.class);
public static final String DEFAULT_LISTEN_PORT = "9090";
/**
* Thrift quality of protection configuration key. Valid values can be:
* auth-conf: authentication, integrity and confidentiality checking
* auth-int: authentication and integrity checking
* auth: authentication only
*
* This is used to authenticate the callers and support impersonation.
* The thrift server and the HBase cluster must run in secure mode.
*/
static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
public static final int DEFAULT_LISTEN_PORT = 9090;
public ThriftServer() {
}
@ -125,12 +156,50 @@ public class ThriftServer {
}
}
private static TTransportFactory getTTransportFactory(boolean framed, int frameSize) {
private static TTransportFactory getTTransportFactory(
String qop, String name, String host, boolean framed, int frameSize) {
if (framed) {
if (qop != null) {
throw new RuntimeException("Thrift server authentication"
+ " doesn't work with framed transport yet");
}
log.debug("Using framed transport");
return new TFramedTransport.Factory(frameSize);
} else {
} else if (qop == null) {
return new TTransportFactory();
} else {
Map<String, String> saslProperties = new HashMap<String, String>();
saslProperties.put(Sasl.QOP, qop);
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
new SaslGssCallbackHandler() {
@Override
public void handle(Callback[] callbacks)
throws UnsupportedCallbackException {
AuthorizeCallback ac = null;
for (Callback callback : callbacks) {
if (callback instanceof AuthorizeCallback) {
ac = (AuthorizeCallback) callback;
} else {
throw new UnsupportedCallbackException(callback,
"Unrecognized SASL GSSAPI Callback");
}
}
if (ac != null) {
String authid = ac.getAuthenticationID();
String authzid = ac.getAuthorizationID();
if (!authid.equals(authzid)) {
ac.setAuthorized(false);
} else {
ac.setAuthorized(true);
String userName = SecurityUtil.getUserFromPrincipal(authzid);
log.info("Effective user: " + userName);
ac.setAuthorizedID(userName);
}
}
}
});
return saslFactory;
}
}
@ -150,7 +219,7 @@ public class ThriftServer {
}
}
private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
@ -162,7 +231,7 @@ public class ThriftServer {
}
private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
THBaseService.Processor processor, TTransportFactory transportFactory,
TProcessor processor, TTransportFactory transportFactory,
InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
@ -188,7 +257,7 @@ public class ThriftServer {
Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
}
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, THBaseService.Processor processor,
private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
@ -242,11 +311,49 @@ public class ThriftServer {
// Get port to bind to
int listenPort = 0;
try {
listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
if (cmd.hasOption("port")) {
listenPort = Integer.parseInt(cmd.getOptionValue("port"));
} else {
listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
}
} catch (NumberFormatException e) {
throw new RuntimeException("Could not parse the value provided for the port option", e);
}
// Local hostname and user name,
// used only if QOP is configured.
String host = null;
String name = null;
UserProvider userProvider = UserProvider.instantiate(conf);
// login the server principal (if using secure Hadoop)
boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
&& userProvider.isHBaseSecurityEnabled();
if (securityEnabled) {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
userProvider.login("hbase.thrift.keytab.file",
"hbase.thrift.kerberos.principal", host);
}
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
String qop = conf.get(THRIFT_QOP_KEY);
if (qop != null) {
if (!qop.equals("auth") && !qop.equals("auth-int")
&& !qop.equals("auth-conf")) {
throw new IOException("Invalid " + THRIFT_QOP_KEY + ": " + qop
+ ", it must be 'auth', 'auth-int', or 'auth-conf'");
}
if (!securityEnabled) {
throw new IOException("Thrift server must"
+ " run in secure mode to support authentication");
}
// Extract the name from the principal
name = SecurityUtil.getUserFromPrincipal(
conf.get("hbase.thrift.kerberos.principal"));
}
boolean nonblocking = cmd.hasOption("nonblocking");
boolean hsha = cmd.hasOption("hsha");
@ -267,17 +374,35 @@ public class ThriftServer {
boolean compact = cmd.hasOption("compact") ||
conf.getBoolean("hbase.regionserver.thrift.compact", false);
TProtocolFactory protocolFactory = getTProtocolFactory(compact);
final ThriftHBaseServiceHandler hbaseHandler =
new ThriftHBaseServiceHandler(conf, userProvider);
THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(conf, metrics);
THBaseService.Processor processor = new THBaseService.Processor(handler);
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
final THBaseService.Processor p = new THBaseService.Processor(handler);
conf.setBoolean("hbase.regionserver.thrift.compact", compact);
TProcessor processor = p;
boolean framed = cmd.hasOption("framed") ||
conf.getBoolean("hbase.regionserver.thrift.framed", false) || nonblocking || hsha;
TTransportFactory transportFactory = getTTransportFactory(framed,
TTransportFactory transportFactory = getTTransportFactory(qop, name, host, framed,
conf.getInt("hbase.regionserver.thrift.framed.max_frame_size_in_mb", 2) * 1024 * 1024);
InetSocketAddress inetSocketAddress = bindToPort(cmd.getOptionValue("bind"), listenPort);
conf.setBoolean("hbase.regionserver.thrift.framed", framed);
if (qop != null) {
// Create a processor wrapper, to get the caller
processor = new TProcessor() {
@Override
public boolean process(TProtocol inProt,
TProtocol outProt) throws TException {
TSaslServerTransport saslServerTransport =
(TSaslServerTransport)inProt.getTransport();
SaslServer saslServer = saslServerTransport.getSaslServer();
String principal = saslServer.getAuthorizationID();
hbaseHandler.setEffectiveUser(principal);
return p.process(inProt, outProt);
}
};
}
// check for user-defined info server port setting, if so override the conf
try {
@ -309,6 +434,15 @@ public class ThriftServer {
} else {
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
}
server.serve();
final TServer tserver = server;
realUser.doAs(
new PrivilegedAction<Object>() {
@Override
public Object run() {
tserver.serve();
return null;
}
});
}
}

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.TAppend;
@ -62,6 +63,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@ -137,6 +139,7 @@ public class TestThriftHBaseServiceHandler {
tableDescriptor.addFamily(family);
}
admin.createTable(tableDescriptor);
admin.close();
}
@AfterClass
@ -149,8 +152,13 @@ public class TestThriftHBaseServiceHandler {
}
private ThriftHBaseServiceHandler createHandler() {
return new ThriftHBaseServiceHandler(UTIL.getConfiguration());
private ThriftHBaseServiceHandler createHandler() throws TException {
try {
Configuration conf = UTIL.getConfiguration();
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
} catch (IOException ie) {
throw new TException(ie);
}
}
@Test
@ -831,8 +839,9 @@ public class TestThriftHBaseServiceHandler {
public void testMetrics() throws Exception {
Configuration conf = UTIL.getConfiguration();
ThriftMetrics metrics = getMetrics(conf);
ThriftHBaseServiceHandler hbaseHandler = createHandler();
THBaseService.Iface handler =
ThriftHBaseServiceHandler.newInstance(conf, metrics);
ThriftHBaseServiceHandler.newInstance(hbaseHandler, metrics);
byte[] rowName = "testMetrics".getBytes();
ByteBuffer table = wrap(tableAname);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.visibility.ScanLabelGenerator;
import org.apache.hadoop.hbase.security.visibility.SimpleScanLabelGenerator;
import org.apache.hadoop.hbase.security.visibility.VisibilityClient;
@ -142,6 +143,7 @@ public static void beforeClass() throws Exception {
tableDescriptor.addFamily(family);
}
admin.createTable(tableDescriptor);
admin.close();
setAuths();
}
@ -179,8 +181,8 @@ public void setup() throws Exception {
}
private ThriftHBaseServiceHandler createHandler() {
return new ThriftHBaseServiceHandler(UTIL.getConfiguration());
private ThriftHBaseServiceHandler createHandler() throws IOException {
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
}
@Test