diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java index 0701a062324..e6f13d6546d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServer.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.util.HttpServerUtil; import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.security.UserGroupInformation; import org.mortbay.jetty.Connector; import org.mortbay.jetty.Server; import org.mortbay.jetty.nio.SelectChannelConnector; @@ -108,8 +107,7 @@ public class RESTServer implements Constants { } } - UserGroupInformation realUser = userProvider.getCurrent().getUGI(); - RESTServlet servlet = RESTServlet.getInstance(conf, realUser); + RESTServlet servlet = RESTServlet.getInstance(conf, userProvider); Options options = new Options(); options.addOption("p", "port", true, "Port to bind to [default: 8080]"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java index 4f0bb89d089..87d1b0400a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServlet.java @@ -19,24 +19,14 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.filter.ParseFilter; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.KeyLocker; -import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.ConnectionCache; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; @@ -49,98 +39,16 @@ public class RESTServlet implements Constants { private static RESTServlet INSTANCE; private final Configuration conf; private final MetricsREST metrics = new MetricsREST(); - private final Map - connections = new ConcurrentHashMap(); - private final KeyLocker locker = new KeyLocker(); + private final ConnectionCache connectionCache; private final UserGroupInformation realUser; static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval"; static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime"; - private final ThreadLocal effectiveUser = - new ThreadLocal() { - protected UserGroupInformation initialValue() { - return realUser; - } - }; - UserGroupInformation getRealUser() { return realUser; } - // A chore to clean up idle connections. - private final Chore connectionCleaner; - private final Stoppable stoppable; - private UserProvider userProvider; - - class ConnectionInfo { - final HConnection connection; - final String userName; - - volatile HBaseAdmin admin; - private long lastAccessTime; - private boolean closed; - - ConnectionInfo(HConnection conn, String user) { - lastAccessTime = EnvironmentEdgeManager.currentTimeMillis(); - connection = conn; - closed = false; - userName = user; - } - - synchronized boolean updateAccessTime() { - if (closed) { - return false; - } - if (connection.isAborted() || connection.isClosed()) { - LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache"); - connections.remove(userName); - return false; - } - lastAccessTime = EnvironmentEdgeManager.currentTimeMillis(); - return true; - } - - synchronized boolean timedOut(int maxIdleTime) { - long timeoutTime = lastAccessTime + maxIdleTime; - if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) { - connections.remove(userName); - closed = true; - } - return false; - } - } - - class ConnectionCleaner extends Chore { - private final int maxIdleTime; - - public ConnectionCleaner(int cleanInterval, int maxIdleTime) { - super("REST-ConnectionCleaner", cleanInterval, stoppable); - this.maxIdleTime = maxIdleTime; - } - - @Override - protected void chore() { - for (Map.Entry entry: connections.entrySet()) { - ConnectionInfo connInfo = entry.getValue(); - if (connInfo.timedOut(maxIdleTime)) { - if (connInfo.admin != null) { - try { - connInfo.admin.close(); - } catch (Throwable t) { - LOG.info("Got exception in closing idle admin", t); - } - } - try { - connInfo.connection.close(); - } catch (Throwable t) { - LOG.info("Got exception in closing idle connection", t); - } - } - } - } - } - /** * @return the RESTServlet singleton instance */ @@ -153,11 +61,12 @@ public class RESTServlet implements Constants { * @param conf Existing configuration to use in rest servlet * @param realUser the login user * @return the RESTServlet singleton instance + * @throws IOException */ public synchronized static RESTServlet getInstance(Configuration conf, - UserGroupInformation realUser) { + UserProvider userProvider) throws IOException { if (INSTANCE == null) { - INSTANCE = new RESTServlet(conf, realUser); + INSTANCE = new RESTServlet(conf, userProvider); } return INSTANCE; } @@ -169,52 +78,30 @@ public class RESTServlet implements Constants { /** * Constructor with existing configuration * @param conf existing configuration - * @param realUser the login user + * @param userProvider the login user provider + * @throws IOException */ RESTServlet(final Configuration conf, - final UserGroupInformation realUser) { - this.userProvider = UserProvider.instantiate(conf); - stoppable = new Stoppable() { - private volatile boolean isStopped = false; - @Override public void stop(String why) { isStopped = true;} - @Override public boolean isStopped() {return isStopped;} - }; + final UserProvider userProvider) throws IOException { + this.realUser = userProvider.getCurrent().getUGI(); + this.conf = conf; + registerCustomFilter(conf); int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); - connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime); - Threads.setDaemonThreadRunning(connectionCleaner.getThread()); - - this.realUser = realUser; - this.conf = conf; - registerCustomFilter(conf); + connectionCache = new ConnectionCache( + conf, userProvider, cleanInterval, maxIdleTime); } - /** - * Caller doesn't close the admin afterwards. - * We need to manage it and close it properly. - */ HBaseAdmin getAdmin() throws IOException { - ConnectionInfo connInfo = getCurrentConnection(); - if (connInfo.admin == null) { - Lock lock = locker.acquireLock(effectiveUser.get().getUserName()); - try { - if (connInfo.admin == null) { - connInfo.admin = new HBaseAdmin(connInfo.connection); - } - } finally { - lock.unlock(); - } - } - return connInfo.admin; + return connectionCache.getAdmin(); } /** * Caller closes the table afterwards. */ HTableInterface getTable(String tableName) throws IOException { - ConnectionInfo connInfo = getCurrentConnection(); - return connInfo.connection.getTable(tableName); + return connectionCache.getTable(tableName); } Configuration getConfiguration() { @@ -234,30 +121,10 @@ public class RESTServlet implements Constants { return getConfiguration().getBoolean("hbase.rest.readonly", false); } - void setEffectiveUser(UserGroupInformation effectiveUser) { - this.effectiveUser.set(effectiveUser); + void setEffectiveUser(String effectiveUser) { + connectionCache.setEffectiveUser(effectiveUser); } - private ConnectionInfo getCurrentConnection() throws IOException { - String userName = effectiveUser.get().getUserName(); - ConnectionInfo connInfo = connections.get(userName); - if (connInfo == null || !connInfo.updateAccessTime()) { - Lock lock = locker.acquireLock(userName); - try { - connInfo = connections.get(userName); - if (connInfo == null) { - User user = userProvider.create(effectiveUser.get()); - HConnection conn = HConnectionManager.createConnection(conf, user); - connInfo = new ConnectionInfo(conn, userName); - connections.put(userName, connInfo); - } - } finally { - lock.unlock(); - } - } - return connInfo; - } - private void registerCustomFilter(Configuration conf) { String[] filterList = conf.getStrings(Constants.CUSTOM_FILTERS); if (filterList != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServletContainer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServletContainer.java index 5caba47cdad..c2f4e59182e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServletContainer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/RESTServletContainer.java @@ -50,28 +50,27 @@ public class RESTServletContainer extends ServletContainer { public void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { final String doAsUserFromQuery = request.getParameter("doAs"); - Configuration conf = RESTServlet.getInstance().getConfiguration(); - final boolean proxyConfigured = conf.getBoolean("hbase.rest.support.proxyuser", false); - if (doAsUserFromQuery != null && !proxyConfigured) { - throw new ServletException("Support for proxyuser is not configured"); - } - UserGroupInformation ugi = RESTServlet.getInstance().getRealUser(); + RESTServlet servlet = RESTServlet.getInstance(); if (doAsUserFromQuery != null) { + Configuration conf = servlet.getConfiguration(); + if (!conf.getBoolean("hbase.rest.support.proxyuser", false)) { + throw new ServletException("Support for proxyuser is not configured"); + } + UserGroupInformation ugi = servlet.getRealUser(); // create and attempt to authorize a proxy user (the client is attempting // to do proxy user) - ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); + ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi); // validate the proxy user authorization try { ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf); } catch(AuthorizationException e) { throw new ServletException(e.getMessage()); } + servlet.setEffectiveUser(doAsUserFromQuery); } else { - // the REST server would send the request without validating the proxy - // user authorization - ugi = UserGroupInformation.createProxyUser(request.getRemoteUser(), ugi); + String effectiveUser = request.getRemoteUser(); + servlet.setEffectiveUser(effectiveUser); } - RESTServlet.getInstance().setEffectiveUser(ugi); super.service(request, response); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java new file mode 100644 index 00000000000..ce16ed1a8af --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/SecurityUtil.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.security; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Security related generic utility methods. + */ +@InterfaceAudience.Private +public class SecurityUtil { + + /** + * Get the user name from a principal + */ + public static String getUserFromPrincipal(final String principal) { + int i = principal.indexOf("/"); + if (i == -1) { + i = principal.indexOf("@"); + } + return (i > -1) ? principal.substring(0, i) : principal; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java new file mode 100644 index 00000000000..9f6f3b37122 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java @@ -0,0 +1,203 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Logger; + +/** + * A utility to store user specific HConnections in memory. + * There is a chore to clean up connections idle for too long. + * This class is used by REST server and Thrift server to + * support authentication and impersonation. + */ +@InterfaceAudience.Private +public class ConnectionCache { + private static Logger LOG = Logger.getLogger(ConnectionCache.class); + + private final Map + connections = new ConcurrentHashMap(); + private final KeyLocker locker = new KeyLocker(); + private final String realUserName; + private final UserGroupInformation realUser; + private final UserProvider userProvider; + private final Configuration conf; + + private final ThreadLocal effectiveUserNames = + new ThreadLocal() { + protected String initialValue() { + return realUserName; + } + }; + + public ConnectionCache(final Configuration conf, + final UserProvider userProvider, + final int cleanInterval, final int maxIdleTime) throws IOException { + Stoppable stoppable = new Stoppable() { + private volatile boolean isStopped = false; + @Override public void stop(String why) { isStopped = true;} + @Override public boolean isStopped() {return isStopped;} + }; + + Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) { + @Override + protected void chore() { + for (Map.Entry entry: connections.entrySet()) { + ConnectionInfo connInfo = entry.getValue(); + if (connInfo.timedOut(maxIdleTime)) { + if (connInfo.admin != null) { + try { + connInfo.admin.close(); + } catch (Throwable t) { + LOG.info("Got exception in closing idle admin", t); + } + } + try { + connInfo.connection.close(); + } catch (Throwable t) { + LOG.info("Got exception in closing idle connection", t); + } + } + } + } + }; + // Start the daemon cleaner chore + Threads.setDaemonThreadRunning(cleaner.getThread()); + this.realUser = userProvider.getCurrent().getUGI(); + this.realUserName = realUser.getShortUserName(); + this.userProvider = userProvider; + this.conf = conf; + } + + /** + * Set the current thread local effective user + */ + public void setEffectiveUser(String user) { + effectiveUserNames.set(user); + } + + /** + * Caller doesn't close the admin afterwards. + * We need to manage it and close it properly. + */ + @SuppressWarnings("deprecation") + public HBaseAdmin getAdmin() throws IOException { + ConnectionInfo connInfo = getCurrentConnection(); + if (connInfo.admin == null) { + Lock lock = locker.acquireLock(effectiveUserNames.get()); + try { + if (connInfo.admin == null) { + connInfo.admin = new HBaseAdmin(connInfo.connection); + } + } finally { + lock.unlock(); + } + } + return connInfo.admin; + } + + /** + * Caller closes the table afterwards. + */ + public HTableInterface getTable(String tableName) throws IOException { + ConnectionInfo connInfo = getCurrentConnection(); + return connInfo.connection.getTable(tableName); + } + + /** + * Get the cached connection for the current user. + * If none or timed out, create a new one. + */ + ConnectionInfo getCurrentConnection() throws IOException { + String userName = effectiveUserNames.get(); + ConnectionInfo connInfo = connections.get(userName); + if (connInfo == null || !connInfo.updateAccessTime()) { + Lock lock = locker.acquireLock(userName); + try { + connInfo = connections.get(userName); + if (connInfo == null) { + UserGroupInformation ugi = realUser; + if (!userName.equals(realUserName)) { + ugi = UserGroupInformation.createProxyUser(userName, realUser); + } + User user = userProvider.create(ugi); + HConnection conn = HConnectionManager.createConnection(conf, user); + connInfo = new ConnectionInfo(conn, userName); + connections.put(userName, connInfo); + } + } finally { + lock.unlock(); + } + } + return connInfo; + } + + class ConnectionInfo { + final HConnection connection; + final String userName; + + volatile HBaseAdmin admin; + private long lastAccessTime; + private boolean closed; + + ConnectionInfo(HConnection conn, String user) { + lastAccessTime = EnvironmentEdgeManager.currentTimeMillis(); + connection = conn; + closed = false; + userName = user; + } + + synchronized boolean updateAccessTime() { + if (closed) { + return false; + } + if (connection.isAborted() || connection.isClosed()) { + LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache"); + connections.remove(userName); + return false; + } + lastAccessTime = EnvironmentEdgeManager.currentTimeMillis(); + return true; + } + + synchronized boolean timedOut(int maxIdleTime) { + long timeoutTime = lastAccessTime + maxIdleTime; + if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) { + connections.remove(userName); + closed = true; + } + return false; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/HBaseRESTTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/HBaseRESTTestingUtility.java index 1353090c0a9..8a399e97a87 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/HBaseRESTTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/HBaseRESTTestingUtility.java @@ -22,7 +22,7 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.HttpServerUtil; import org.apache.hadoop.util.StringUtils; import org.mortbay.jetty.Server; @@ -49,7 +49,7 @@ public class HBaseRESTTestingUtility { } // Inject the conf for the test by being first to make singleton - RESTServlet.getInstance(conf, User.getCurrent().getUGI()); + RESTServlet.getInstance(conf, UserProvider.instantiate(conf)); // set up the Jersey servlet container for Jetty ServletHolder sh = new ServletHolder(ServletContainer.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java index 53e603f0045..c14f3e2645e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.RowModel; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; @@ -421,18 +422,18 @@ public class TestGetAndPutResource extends RowResourceBase { response = deleteRow(TABLE, ROW_4); assertEquals(response.getCode(), 200); - UserGroupInformation ugi = User.getCurrent().getUGI(); + UserProvider userProvider = UserProvider.instantiate(conf); METRICS_ASSERT.assertCounterGt("requests", 2l, - RESTServlet.getInstance(conf, ugi).getMetrics().getSource()); + RESTServlet.getInstance(conf, userProvider).getMetrics().getSource()); METRICS_ASSERT.assertCounterGt("successfulGet", 0l, - RESTServlet.getInstance(conf, ugi).getMetrics().getSource()); + RESTServlet.getInstance(conf, userProvider).getMetrics().getSource()); METRICS_ASSERT.assertCounterGt("successfulPut", 0l, - RESTServlet.getInstance(conf, ugi).getMetrics().getSource()); + RESTServlet.getInstance(conf, userProvider).getMetrics().getSource()); METRICS_ASSERT.assertCounterGt("successfulDelete", 0l, - RESTServlet.getInstance(conf, ugi).getMetrics().getSource()); + RESTServlet.getInstance(conf, userProvider).getMetrics().getSource()); } @Test diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index 97988eb6d48..dfcdc8f7d92 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -32,11 +32,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.http.InfoServer; -import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; -import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.VersionInfo; -import org.apache.hadoop.net.DNS; import org.apache.hadoop.util.Shell.ExitCodeException; /** @@ -92,17 +89,6 @@ public class ThriftServer { void doMain(final String[] args) throws Exception { processOptions(args); - UserProvider userProvider = UserProvider.instantiate(conf); - // login the server principal (if using secure Hadoop) - if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) { - String machineName = - Strings.domainNamePointerToHostName(DNS.getDefaultHost( - conf.get("hbase.thrift.dns.interface", "default"), - conf.get("hbase.thrift.dns.nameserver", "default"))); - userProvider - .login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", machineName); - } - serverRunner = new ThriftServerRunner(conf); // Put up info server. @@ -168,9 +154,10 @@ public class ThriftServer { // Get port to bind to try { - int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, - String.valueOf(DEFAULT_LISTEN_PORT))); - conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); + if (cmd.hasOption(PORT_OPTION)) { + int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION)); + conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); + } } catch (NumberFormatException e) { LOG.error("Could not parse the value provided for the port option", e); printUsageAndExit(options, -1); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java index 4ddb6f8571c..252c5c20e93 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -25,6 +25,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -38,6 +39,12 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.AuthorizeCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslServer; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; @@ -70,6 +77,8 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.security.SecurityUtil; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.thrift.CallQueue.Call; import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; @@ -85,9 +94,16 @@ import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; import org.apache.hadoop.hbase.thrift.generated.TRowResult; import org.apache.hadoop.hbase.thrift.generated.TScan; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ConnectionCache; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.net.DNS; +import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.server.THsHaServer; import org.apache.thrift.server.TNonblockingServer; @@ -96,6 +112,7 @@ import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TSaslServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; import org.apache.thrift.transport.TTransportFactory; @@ -108,6 +125,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * the Hbase API specified in the Hbase.thrift IDL file. */ @InterfaceAudience.Private +@SuppressWarnings("deprecation") public class ThriftServerRunner implements Runnable { private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class); @@ -122,6 +140,17 @@ public class ThriftServerRunner implements Runnable { static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement"; + /** + * Thrift quality of protection configuration key. Valid values can be: + * auth-conf: authentication, integrity and confidentiality checking + * auth-int: authentication and integrity checking + * auth: authentication only + * + * This is used to authenticate the callers and support impersonation. + * The thrift server and the HBase cluster must run in secure mode. + */ + static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop"; + private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; public static final int DEFAULT_LISTEN_PORT = 9090; private final int listenPort; @@ -130,6 +159,11 @@ public class ThriftServerRunner implements Runnable { volatile TServer tserver; private final Hbase.Iface handler; private final ThriftMetrics metrics; + private final HBaseHandler hbaseHandler; + private final UserGroupInformation realUser; + + private final String qop; + private String host; /** An enum of server implementation selections */ enum ImplType { @@ -230,15 +264,37 @@ public class ThriftServerRunner implements Runnable { } public ThriftServerRunner(Configuration conf) throws IOException { - this(conf, new ThriftServerRunner.HBaseHandler(conf)); - } - - public ThriftServerRunner(Configuration conf, HBaseHandler handler) { + UserProvider userProvider = UserProvider.instantiate(conf); + // login the server principal (if using secure Hadoop) + boolean securityEnabled = userProvider.isHadoopSecurityEnabled() + && userProvider.isHBaseSecurityEnabled(); + if (securityEnabled) { + host = Strings.domainNamePointerToHostName(DNS.getDefaultHost( + conf.get("hbase.thrift.dns.interface", "default"), + conf.get("hbase.thrift.dns.nameserver", "default"))); + userProvider.login("hbase.thrift.keytab.file", + "hbase.thrift.kerberos.principal", host); + } this.conf = HBaseConfiguration.create(conf); this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE); - handler.initMetrics(metrics); - this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf); + this.hbaseHandler = new HBaseHandler(conf, userProvider); + this.hbaseHandler.initMetrics(metrics); + this.handler = HbaseHandlerMetricsProxy.newInstance( + hbaseHandler, metrics, conf); + this.realUser = userProvider.getCurrent().getUGI(); + qop = conf.get(THRIFT_QOP_KEY); + if (qop != null) { + if (!qop.equals("auth") && !qop.equals("auth-int") + && !qop.equals("auth-conf")) { + throw new IOException("Invalid hbase.thrift.security.qop: " + qop + + ", it must be 'auth', 'auth-int', or 'auth-conf'"); + } + if (!securityEnabled) { + throw new IOException("Thrift server must" + + " run in secure mode to support authentication"); + } + } } /* @@ -246,14 +302,21 @@ public class ThriftServerRunner implements Runnable { */ @Override public void run() { - try { - setupServer(); - tserver.serve(); - } catch (Exception e) { - LOG.fatal("Cannot run ThriftServer", e); - // Crash the process if the ThriftServer is not running - System.exit(-1); - } + realUser.doAs( + new PrivilegedAction() { + @Override + public Object run() { + try { + setupServer(); + tserver.serve(); + } catch (Exception e) { + LOG.fatal("Cannot run ThriftServer", e); + // Crash the process if the ThriftServer is not running + System.exit(-1); + } + return null; + } + }); } public void shutdown() { @@ -277,18 +340,72 @@ public class ThriftServerRunner implements Runnable { protocolFactory = new TBinaryProtocol.Factory(); } - Hbase.Processor processor = - new Hbase.Processor(handler); + final TProcessor p = new Hbase.Processor(handler); ImplType implType = ImplType.getServerImpl(conf); + TProcessor processor = p; // Construct correct TransportFactory TTransportFactory transportFactory; if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { + if (qop != null) { + throw new RuntimeException("Thrift server authentication" + + " doesn't work with framed transport yet"); + } transportFactory = new TFramedTransport.Factory( conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024); LOG.debug("Using framed transport"); - } else { + } else if (qop == null) { transportFactory = new TTransportFactory(); + } else { + // Extract the name from the principal + String name = SecurityUtil.getUserFromPrincipal( + conf.get("hbase.thrift.kerberos.principal")); + Map saslProperties = new HashMap(); + saslProperties.put(Sasl.QOP, qop); + TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory(); + saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties, + new SaslGssCallbackHandler() { + @Override + public void handle(Callback[] callbacks) + throws UnsupportedCallbackException { + AuthorizeCallback ac = null; + for (Callback callback : callbacks) { + if (callback instanceof AuthorizeCallback) { + ac = (AuthorizeCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, + "Unrecognized SASL GSSAPI Callback"); + } + } + if (ac != null) { + String authid = ac.getAuthenticationID(); + String authzid = ac.getAuthorizationID(); + if (!authid.equals(authzid)) { + ac.setAuthorized(false); + } else { + ac.setAuthorized(true); + String userName = SecurityUtil.getUserFromPrincipal(authzid); + LOG.info("Effective user: " + userName); + ac.setAuthorizedID(userName); + } + } + } + }); + transportFactory = saslFactory; + + // Create a processor wrapper, to get the caller + processor = new TProcessor() { + @Override + public boolean process(TProtocol inProt, + TProtocol outProt) throws TException { + TSaslServerTransport saslServerTransport = + (TSaslServerTransport)inProt.getTransport(); + SaslServer saslServer = saslServerTransport.getSaslServer(); + String principal = saslServer.getAuthorizationID(); + hbaseHandler.setEffectiveUser(principal); + return p.process(inProt, outProt); + } + }; } if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { @@ -415,7 +532,6 @@ public class ThriftServerRunner implements Runnable { */ public static class HBaseHandler implements Hbase.Iface { protected Configuration conf; - protected volatile HBaseAdmin admin = null; protected final Log LOG = LogFactory.getLog(this.getClass().getName()); // nextScannerId and scannerMap are used to manage scanner state @@ -423,6 +539,8 @@ public class ThriftServerRunner implements Runnable { protected HashMap scannerMap = null; private ThriftMetrics metrics = null; + private final ConnectionCache connectionCache; + private static ThreadLocal> threadLocalTables = new ThreadLocal>() { @Override @@ -433,6 +551,9 @@ public class ThriftServerRunner implements Runnable { IncrementCoalescer coalescer = null; + static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval"; + static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime"; + /** * Returns a list of all the column families for a given htable. * @@ -460,10 +581,10 @@ public class ThriftServerRunner implements Runnable { */ public HTable getTable(final byte[] tableName) throws IOException { - String table = new String(tableName); + String table = Bytes.toString(tableName); Map tables = threadLocalTables.get(); if (!tables.containsKey(table)) { - tables.put(table, new HTable(conf, tableName)); + tables.put(table, (HTable)connectionCache.getTable(table)); } return tables.get(table); } @@ -507,33 +628,27 @@ public class ThriftServerRunner implements Runnable { return scannerMap.remove(id); } - /** - * Constructs an HBaseHandler object. - * @throws IOException - */ - protected HBaseHandler() - throws IOException { - this(HBaseConfiguration.create()); - } - - protected HBaseHandler(final Configuration c) throws IOException { + protected HBaseHandler(final Configuration c, + final UserProvider userProvider) throws IOException { this.conf = c; scannerMap = new HashMap(); this.coalescer = new IncrementCoalescer(this); + + int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000); + int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000); + connectionCache = new ConnectionCache( + conf, userProvider, cleanInterval, maxIdleTime); } /** * Obtain HBaseAdmin. Creates the instance if it is not already created. */ private HBaseAdmin getHBaseAdmin() throws IOException { - if (admin == null) { - synchronized (this) { - if (admin == null) { - admin = new HBaseAdmin(conf); - } - } - } - return admin; + return connectionCache.getAdmin(); + } + + void setEffectiveUser(String effectiveUser) { + connectionCache.setEffectiveUser(effectiveUser); } @Override diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index 0776012cf6e..555b12f6278 100644 --- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler; import org.apache.hadoop.hbase.thrift.generated.BatchMutation; @@ -137,7 +138,8 @@ public class TestThriftServer { */ public void doTestTableCreateDrop() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); doTestTableCreateDrop(handler); } @@ -151,7 +153,7 @@ public class TestThriftServer { protected MySlowHBaseHandler(Configuration c) throws IOException { - super(c); + super(c, UserProvider.instantiate(c)); } @Override @@ -256,7 +258,8 @@ public class TestThriftServer { public void doTestIncrements() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); createTestTables(handler); doTestIncrements(handler); dropTestTables(handler); @@ -303,7 +306,8 @@ public class TestThriftServer { */ public void doTestTableMutations() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); doTestTableMutations(handler); } @@ -381,7 +385,8 @@ public class TestThriftServer { public void doTestTableTimestampsAndColumns() throws Exception { // Setup ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -460,7 +465,8 @@ public class TestThriftServer { public void doTestTableScanners() throws Exception { // Setup ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -579,7 +585,8 @@ public class TestThriftServer { */ public void doTestGetTableRegions() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); doTestGetTableRegions(handler); } @@ -614,7 +621,8 @@ public class TestThriftServer { public void doTestGetRegionInfo() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); doTestGetRegionInfo(handler); } @@ -642,7 +650,8 @@ public class TestThriftServer { */ public static void doTestAppend() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); handler.createTable(tableAname, getColumnDescriptors()); try { List mutations = new ArrayList(1); @@ -675,7 +684,8 @@ public class TestThriftServer { */ public static void doTestCheckAndPut() throws Exception { ThriftServerRunner.HBaseHandler handler = - new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(), + UserProvider.instantiate(UTIL.getConfiguration())); handler.createTable(tableAname, getColumnDescriptors()); try { List mutations = new ArrayList(1);