HBASE-11349 [Thrift] support authentication/impersonation
This commit is contained in:
parent
3e7cccfb4b
commit
dbfdf5bdb6
@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.util.HttpServerUtil;
|
|||||||
import org.apache.hadoop.hbase.util.Strings;
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.mortbay.jetty.Connector;
|
import org.mortbay.jetty.Connector;
|
||||||
import org.mortbay.jetty.Server;
|
import org.mortbay.jetty.Server;
|
||||||
import org.mortbay.jetty.nio.SelectChannelConnector;
|
import org.mortbay.jetty.nio.SelectChannelConnector;
|
||||||
@ -108,8 +107,7 @@ public class RESTServer implements Constants {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
|
RESTServlet servlet = RESTServlet.getInstance(conf, userProvider);
|
||||||
RESTServlet servlet = RESTServlet.getInstance(conf, realUser);
|
|
||||||
|
|
||||||
Options options = new Options();
|
Options options = new Options();
|
||||||
options.addOption("p", "port", true, "Port to bind to [default: 8080]");
|
options.addOption("p", "port", true, "Port to bind to [default: 8080]");
|
||||||
|
@ -19,24 +19,14 @@
|
|||||||
package org.apache.hadoop.hbase.rest;
|
package org.apache.hadoop.hbase.rest;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
|
||||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.ConnectionCache;
|
||||||
import org.apache.hadoop.hbase.util.KeyLocker;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@ -49,98 +39,16 @@ public class RESTServlet implements Constants {
|
|||||||
private static RESTServlet INSTANCE;
|
private static RESTServlet INSTANCE;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final MetricsREST metrics = new MetricsREST();
|
private final MetricsREST metrics = new MetricsREST();
|
||||||
private final Map<String, ConnectionInfo>
|
private final ConnectionCache connectionCache;
|
||||||
connections = new ConcurrentHashMap<String, ConnectionInfo>();
|
|
||||||
private final KeyLocker<String> locker = new KeyLocker<String>();
|
|
||||||
private final UserGroupInformation realUser;
|
private final UserGroupInformation realUser;
|
||||||
|
|
||||||
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
|
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
|
||||||
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
|
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
|
||||||
|
|
||||||
private final ThreadLocal<UserGroupInformation> effectiveUser =
|
|
||||||
new ThreadLocal<UserGroupInformation>() {
|
|
||||||
protected UserGroupInformation initialValue() {
|
|
||||||
return realUser;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
UserGroupInformation getRealUser() {
|
UserGroupInformation getRealUser() {
|
||||||
return realUser;
|
return realUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
// A chore to clean up idle connections.
|
|
||||||
private final Chore connectionCleaner;
|
|
||||||
private final Stoppable stoppable;
|
|
||||||
private UserProvider userProvider;
|
|
||||||
|
|
||||||
class ConnectionInfo {
|
|
||||||
final HConnection connection;
|
|
||||||
final String userName;
|
|
||||||
|
|
||||||
volatile HBaseAdmin admin;
|
|
||||||
private long lastAccessTime;
|
|
||||||
private boolean closed;
|
|
||||||
|
|
||||||
ConnectionInfo(HConnection conn, String user) {
|
|
||||||
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
connection = conn;
|
|
||||||
closed = false;
|
|
||||||
userName = user;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized boolean updateAccessTime() {
|
|
||||||
if (closed) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (connection.isAborted() || connection.isClosed()) {
|
|
||||||
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
|
|
||||||
connections.remove(userName);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized boolean timedOut(int maxIdleTime) {
|
|
||||||
long timeoutTime = lastAccessTime + maxIdleTime;
|
|
||||||
if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
|
|
||||||
connections.remove(userName);
|
|
||||||
closed = true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class ConnectionCleaner extends Chore {
|
|
||||||
private final int maxIdleTime;
|
|
||||||
|
|
||||||
public ConnectionCleaner(int cleanInterval, int maxIdleTime) {
|
|
||||||
super("REST-ConnectionCleaner", cleanInterval, stoppable);
|
|
||||||
this.maxIdleTime = maxIdleTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void chore() {
|
|
||||||
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
|
|
||||||
ConnectionInfo connInfo = entry.getValue();
|
|
||||||
if (connInfo.timedOut(maxIdleTime)) {
|
|
||||||
if (connInfo.admin != null) {
|
|
||||||
try {
|
|
||||||
connInfo.admin.close();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.info("Got exception in closing idle admin", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
connInfo.connection.close();
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.info("Got exception in closing idle connection", t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the RESTServlet singleton instance
|
* @return the RESTServlet singleton instance
|
||||||
*/
|
*/
|
||||||
@ -153,11 +61,12 @@ public class RESTServlet implements Constants {
|
|||||||
* @param conf Existing configuration to use in rest servlet
|
* @param conf Existing configuration to use in rest servlet
|
||||||
* @param realUser the login user
|
* @param realUser the login user
|
||||||
* @return the RESTServlet singleton instance
|
* @return the RESTServlet singleton instance
|
||||||
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public synchronized static RESTServlet getInstance(Configuration conf,
|
public synchronized static RESTServlet getInstance(Configuration conf,
|
||||||
UserGroupInformation realUser) {
|
UserProvider userProvider) throws IOException {
|
||||||
if (INSTANCE == null) {
|
if (INSTANCE == null) {
|
||||||
INSTANCE = new RESTServlet(conf, realUser);
|
INSTANCE = new RESTServlet(conf, userProvider);
|
||||||
}
|
}
|
||||||
return INSTANCE;
|
return INSTANCE;
|
||||||
}
|
}
|
||||||
@ -169,52 +78,30 @@ public class RESTServlet implements Constants {
|
|||||||
/**
|
/**
|
||||||
* Constructor with existing configuration
|
* Constructor with existing configuration
|
||||||
* @param conf existing configuration
|
* @param conf existing configuration
|
||||||
* @param realUser the login user
|
* @param userProvider the login user provider
|
||||||
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
RESTServlet(final Configuration conf,
|
RESTServlet(final Configuration conf,
|
||||||
final UserGroupInformation realUser) {
|
final UserProvider userProvider) throws IOException {
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
this.realUser = userProvider.getCurrent().getUGI();
|
||||||
stoppable = new Stoppable() {
|
this.conf = conf;
|
||||||
private volatile boolean isStopped = false;
|
registerCustomFilter(conf);
|
||||||
@Override public void stop(String why) { isStopped = true;}
|
|
||||||
@Override public boolean isStopped() {return isStopped;}
|
|
||||||
};
|
|
||||||
|
|
||||||
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
|
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
|
||||||
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
|
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
|
||||||
connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime);
|
connectionCache = new ConnectionCache(
|
||||||
Threads.setDaemonThreadRunning(connectionCleaner.getThread());
|
conf, userProvider, cleanInterval, maxIdleTime);
|
||||||
|
|
||||||
this.realUser = realUser;
|
|
||||||
this.conf = conf;
|
|
||||||
registerCustomFilter(conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Caller doesn't close the admin afterwards.
|
|
||||||
* We need to manage it and close it properly.
|
|
||||||
*/
|
|
||||||
HBaseAdmin getAdmin() throws IOException {
|
HBaseAdmin getAdmin() throws IOException {
|
||||||
ConnectionInfo connInfo = getCurrentConnection();
|
return connectionCache.getAdmin();
|
||||||
if (connInfo.admin == null) {
|
|
||||||
Lock lock = locker.acquireLock(effectiveUser.get().getUserName());
|
|
||||||
try {
|
|
||||||
if (connInfo.admin == null) {
|
|
||||||
connInfo.admin = new HBaseAdmin(connInfo.connection);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return connInfo.admin;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Caller closes the table afterwards.
|
* Caller closes the table afterwards.
|
||||||
*/
|
*/
|
||||||
HTableInterface getTable(String tableName) throws IOException {
|
HTableInterface getTable(String tableName) throws IOException {
|
||||||
ConnectionInfo connInfo = getCurrentConnection();
|
return connectionCache.getTable(tableName);
|
||||||
return connInfo.connection.getTable(tableName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Configuration getConfiguration() {
|
Configuration getConfiguration() {
|
||||||
@ -234,30 +121,10 @@ public class RESTServlet implements Constants {
|
|||||||
return getConfiguration().getBoolean("hbase.rest.readonly", false);
|
return getConfiguration().getBoolean("hbase.rest.readonly", false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setEffectiveUser(UserGroupInformation effectiveUser) {
|
void setEffectiveUser(String effectiveUser) {
|
||||||
this.effectiveUser.set(effectiveUser);
|
connectionCache.setEffectiveUser(effectiveUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ConnectionInfo getCurrentConnection() throws IOException {
|
|
||||||
String userName = effectiveUser.get().getUserName();
|
|
||||||
ConnectionInfo connInfo = connections.get(userName);
|
|
||||||
if (connInfo == null || !connInfo.updateAccessTime()) {
|
|
||||||
Lock lock = locker.acquireLock(userName);
|
|
||||||
try {
|
|
||||||
connInfo = connections.get(userName);
|
|
||||||
if (connInfo == null) {
|
|
||||||
User user = userProvider.create(effectiveUser.get());
|
|
||||||
HConnection conn = HConnectionManager.createConnection(conf, user);
|
|
||||||
connInfo = new ConnectionInfo(conn, userName);
|
|
||||||
connections.put(userName, connInfo);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return connInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void registerCustomFilter(Configuration conf) {
|
private void registerCustomFilter(Configuration conf) {
|
||||||
String[] filterList = conf.getStrings(Constants.CUSTOM_FILTERS);
|
String[] filterList = conf.getStrings(Constants.CUSTOM_FILTERS);
|
||||||
if (filterList != null) {
|
if (filterList != null) {
|
||||||
|
@ -50,28 +50,27 @@ public class RESTServletContainer extends ServletContainer {
|
|||||||
public void service(final HttpServletRequest request,
|
public void service(final HttpServletRequest request,
|
||||||
final HttpServletResponse response) throws ServletException, IOException {
|
final HttpServletResponse response) throws ServletException, IOException {
|
||||||
final String doAsUserFromQuery = request.getParameter("doAs");
|
final String doAsUserFromQuery = request.getParameter("doAs");
|
||||||
Configuration conf = RESTServlet.getInstance().getConfiguration();
|
RESTServlet servlet = RESTServlet.getInstance();
|
||||||
final boolean proxyConfigured = conf.getBoolean("hbase.rest.support.proxyuser", false);
|
|
||||||
if (doAsUserFromQuery != null && !proxyConfigured) {
|
|
||||||
throw new ServletException("Support for proxyuser is not configured");
|
|
||||||
}
|
|
||||||
UserGroupInformation ugi = RESTServlet.getInstance().getRealUser();
|
|
||||||
if (doAsUserFromQuery != null) {
|
if (doAsUserFromQuery != null) {
|
||||||
|
Configuration conf = servlet.getConfiguration();
|
||||||
|
if (!conf.getBoolean("hbase.rest.support.proxyuser", false)) {
|
||||||
|
throw new ServletException("Support for proxyuser is not configured");
|
||||||
|
}
|
||||||
|
UserGroupInformation ugi = servlet.getRealUser();
|
||||||
// create and attempt to authorize a proxy user (the client is attempting
|
// create and attempt to authorize a proxy user (the client is attempting
|
||||||
// to do proxy user)
|
// to do proxy user)
|
||||||
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
|
ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
|
||||||
// validate the proxy user authorization
|
// validate the proxy user authorization
|
||||||
try {
|
try {
|
||||||
ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
|
ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
|
||||||
} catch(AuthorizationException e) {
|
} catch(AuthorizationException e) {
|
||||||
throw new ServletException(e.getMessage());
|
throw new ServletException(e.getMessage());
|
||||||
}
|
}
|
||||||
|
servlet.setEffectiveUser(doAsUserFromQuery);
|
||||||
} else {
|
} else {
|
||||||
// the REST server would send the request without validating the proxy
|
String effectiveUser = request.getRemoteUser();
|
||||||
// user authorization
|
servlet.setEffectiveUser(effectiveUser);
|
||||||
ugi = UserGroupInformation.createProxyUser(request.getRemoteUser(), ugi);
|
|
||||||
}
|
}
|
||||||
RESTServlet.getInstance().setEffectiveUser(ugi);
|
|
||||||
super.service(request, response);
|
super.service(request, response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Security related generic utility methods.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class SecurityUtil {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the user name from a principal
|
||||||
|
*/
|
||||||
|
public static String getUserFromPrincipal(final String principal) {
|
||||||
|
int i = principal.indexOf("/");
|
||||||
|
if (i == -1) {
|
||||||
|
i = principal.indexOf("@");
|
||||||
|
}
|
||||||
|
return (i > -1) ? principal.substring(0, i) : principal;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,203 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Chore;
|
||||||
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A utility to store user specific HConnections in memory.
|
||||||
|
* There is a chore to clean up connections idle for too long.
|
||||||
|
* This class is used by REST server and Thrift server to
|
||||||
|
* support authentication and impersonation.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ConnectionCache {
|
||||||
|
private static Logger LOG = Logger.getLogger(ConnectionCache.class);
|
||||||
|
|
||||||
|
private final Map<String, ConnectionInfo>
|
||||||
|
connections = new ConcurrentHashMap<String, ConnectionInfo>();
|
||||||
|
private final KeyLocker<String> locker = new KeyLocker<String>();
|
||||||
|
private final String realUserName;
|
||||||
|
private final UserGroupInformation realUser;
|
||||||
|
private final UserProvider userProvider;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
|
private final ThreadLocal<String> effectiveUserNames =
|
||||||
|
new ThreadLocal<String>() {
|
||||||
|
protected String initialValue() {
|
||||||
|
return realUserName;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public ConnectionCache(final Configuration conf,
|
||||||
|
final UserProvider userProvider,
|
||||||
|
final int cleanInterval, final int maxIdleTime) throws IOException {
|
||||||
|
Stoppable stoppable = new Stoppable() {
|
||||||
|
private volatile boolean isStopped = false;
|
||||||
|
@Override public void stop(String why) { isStopped = true;}
|
||||||
|
@Override public boolean isStopped() {return isStopped;}
|
||||||
|
};
|
||||||
|
|
||||||
|
Chore cleaner = new Chore("ConnectionCleaner", cleanInterval, stoppable) {
|
||||||
|
@Override
|
||||||
|
protected void chore() {
|
||||||
|
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
|
||||||
|
ConnectionInfo connInfo = entry.getValue();
|
||||||
|
if (connInfo.timedOut(maxIdleTime)) {
|
||||||
|
if (connInfo.admin != null) {
|
||||||
|
try {
|
||||||
|
connInfo.admin.close();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.info("Got exception in closing idle admin", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
connInfo.connection.close();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.info("Got exception in closing idle connection", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Start the daemon cleaner chore
|
||||||
|
Threads.setDaemonThreadRunning(cleaner.getThread());
|
||||||
|
this.realUser = userProvider.getCurrent().getUGI();
|
||||||
|
this.realUserName = realUser.getShortUserName();
|
||||||
|
this.userProvider = userProvider;
|
||||||
|
this.conf = conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the current thread local effective user
|
||||||
|
*/
|
||||||
|
public void setEffectiveUser(String user) {
|
||||||
|
effectiveUserNames.set(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caller doesn't close the admin afterwards.
|
||||||
|
* We need to manage it and close it properly.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public HBaseAdmin getAdmin() throws IOException {
|
||||||
|
ConnectionInfo connInfo = getCurrentConnection();
|
||||||
|
if (connInfo.admin == null) {
|
||||||
|
Lock lock = locker.acquireLock(effectiveUserNames.get());
|
||||||
|
try {
|
||||||
|
if (connInfo.admin == null) {
|
||||||
|
connInfo.admin = new HBaseAdmin(connInfo.connection);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return connInfo.admin;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caller closes the table afterwards.
|
||||||
|
*/
|
||||||
|
public HTableInterface getTable(String tableName) throws IOException {
|
||||||
|
ConnectionInfo connInfo = getCurrentConnection();
|
||||||
|
return connInfo.connection.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the cached connection for the current user.
|
||||||
|
* If none or timed out, create a new one.
|
||||||
|
*/
|
||||||
|
ConnectionInfo getCurrentConnection() throws IOException {
|
||||||
|
String userName = effectiveUserNames.get();
|
||||||
|
ConnectionInfo connInfo = connections.get(userName);
|
||||||
|
if (connInfo == null || !connInfo.updateAccessTime()) {
|
||||||
|
Lock lock = locker.acquireLock(userName);
|
||||||
|
try {
|
||||||
|
connInfo = connections.get(userName);
|
||||||
|
if (connInfo == null) {
|
||||||
|
UserGroupInformation ugi = realUser;
|
||||||
|
if (!userName.equals(realUserName)) {
|
||||||
|
ugi = UserGroupInformation.createProxyUser(userName, realUser);
|
||||||
|
}
|
||||||
|
User user = userProvider.create(ugi);
|
||||||
|
HConnection conn = HConnectionManager.createConnection(conf, user);
|
||||||
|
connInfo = new ConnectionInfo(conn, userName);
|
||||||
|
connections.put(userName, connInfo);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return connInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
class ConnectionInfo {
|
||||||
|
final HConnection connection;
|
||||||
|
final String userName;
|
||||||
|
|
||||||
|
volatile HBaseAdmin admin;
|
||||||
|
private long lastAccessTime;
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
|
ConnectionInfo(HConnection conn, String user) {
|
||||||
|
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
connection = conn;
|
||||||
|
closed = false;
|
||||||
|
userName = user;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean updateAccessTime() {
|
||||||
|
if (closed) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (connection.isAborted() || connection.isClosed()) {
|
||||||
|
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
|
||||||
|
connections.remove(userName);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean timedOut(int maxIdleTime) {
|
||||||
|
long timeoutTime = lastAccessTime + maxIdleTime;
|
||||||
|
if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
|
||||||
|
connections.remove(userName);
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -22,7 +22,7 @@ import org.apache.commons.lang.ArrayUtils;
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.util.HttpServerUtil;
|
import org.apache.hadoop.hbase.util.HttpServerUtil;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.mortbay.jetty.Server;
|
import org.mortbay.jetty.Server;
|
||||||
@ -49,7 +49,7 @@ public class HBaseRESTTestingUtility {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inject the conf for the test by being first to make singleton
|
// Inject the conf for the test by being first to make singleton
|
||||||
RESTServlet.getInstance(conf, User.getCurrent().getUGI());
|
RESTServlet.getInstance(conf, UserProvider.instantiate(conf));
|
||||||
|
|
||||||
// set up the Jersey servlet container for Jetty
|
// set up the Jersey servlet container for Jetty
|
||||||
ServletHolder sh = new ServletHolder(ServletContainer.class);
|
ServletHolder sh = new ServletHolder(ServletContainer.class);
|
||||||
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.rest.model.CellModel;
|
|||||||
import org.apache.hadoop.hbase.rest.model.CellSetModel;
|
import org.apache.hadoop.hbase.rest.model.CellSetModel;
|
||||||
import org.apache.hadoop.hbase.rest.model.RowModel;
|
import org.apache.hadoop.hbase.rest.model.RowModel;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
@ -421,18 +422,18 @@ public class TestGetAndPutResource extends RowResourceBase {
|
|||||||
response = deleteRow(TABLE, ROW_4);
|
response = deleteRow(TABLE, ROW_4);
|
||||||
assertEquals(response.getCode(), 200);
|
assertEquals(response.getCode(), 200);
|
||||||
|
|
||||||
UserGroupInformation ugi = User.getCurrent().getUGI();
|
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||||
METRICS_ASSERT.assertCounterGt("requests", 2l,
|
METRICS_ASSERT.assertCounterGt("requests", 2l,
|
||||||
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
|
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
|
||||||
|
|
||||||
METRICS_ASSERT.assertCounterGt("successfulGet", 0l,
|
METRICS_ASSERT.assertCounterGt("successfulGet", 0l,
|
||||||
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
|
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
|
||||||
|
|
||||||
METRICS_ASSERT.assertCounterGt("successfulPut", 0l,
|
METRICS_ASSERT.assertCounterGt("successfulPut", 0l,
|
||||||
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
|
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
|
||||||
|
|
||||||
METRICS_ASSERT.assertCounterGt("successfulDelete", 0l,
|
METRICS_ASSERT.assertCounterGt("successfulDelete", 0l,
|
||||||
RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
|
RESTServlet.getInstance(conf, userProvider).getMetrics().getSource());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -32,11 +32,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.http.InfoServer;
|
import org.apache.hadoop.hbase.http.InfoServer;
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
|
||||||
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
|
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
|
||||||
import org.apache.hadoop.hbase.util.Strings;
|
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
import org.apache.hadoop.net.DNS;
|
|
||||||
import org.apache.hadoop.util.Shell.ExitCodeException;
|
import org.apache.hadoop.util.Shell.ExitCodeException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -92,17 +89,6 @@ public class ThriftServer {
|
|||||||
void doMain(final String[] args) throws Exception {
|
void doMain(final String[] args) throws Exception {
|
||||||
processOptions(args);
|
processOptions(args);
|
||||||
|
|
||||||
UserProvider userProvider = UserProvider.instantiate(conf);
|
|
||||||
// login the server principal (if using secure Hadoop)
|
|
||||||
if (userProvider.isHadoopSecurityEnabled() && userProvider.isHBaseSecurityEnabled()) {
|
|
||||||
String machineName =
|
|
||||||
Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
|
||||||
conf.get("hbase.thrift.dns.interface", "default"),
|
|
||||||
conf.get("hbase.thrift.dns.nameserver", "default")));
|
|
||||||
userProvider
|
|
||||||
.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", machineName);
|
|
||||||
}
|
|
||||||
|
|
||||||
serverRunner = new ThriftServerRunner(conf);
|
serverRunner = new ThriftServerRunner(conf);
|
||||||
|
|
||||||
// Put up info server.
|
// Put up info server.
|
||||||
@ -168,9 +154,10 @@ public class ThriftServer {
|
|||||||
|
|
||||||
// Get port to bind to
|
// Get port to bind to
|
||||||
try {
|
try {
|
||||||
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
|
if (cmd.hasOption(PORT_OPTION)) {
|
||||||
String.valueOf(DEFAULT_LISTEN_PORT)));
|
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
|
||||||
conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
|
conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
|
||||||
|
}
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
LOG.error("Could not parse the value provided for the port option", e);
|
LOG.error("Could not parse the value provided for the port option", e);
|
||||||
printUsageAndExit(options, -1);
|
printUsageAndExit(options, -1);
|
||||||
|
@ -25,6 +25,7 @@ import java.net.InetAddress;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -38,6 +39,12 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.security.auth.callback.Callback;
|
||||||
|
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||||
|
import javax.security.sasl.AuthorizeCallback;
|
||||||
|
import javax.security.sasl.Sasl;
|
||||||
|
import javax.security.sasl.SaslServer;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.Option;
|
import org.apache.commons.cli.Option;
|
||||||
import org.apache.commons.cli.OptionGroup;
|
import org.apache.commons.cli.OptionGroup;
|
||||||
@ -70,6 +77,8 @@ import org.apache.hadoop.hbase.filter.Filter;
|
|||||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
|
||||||
|
import org.apache.hadoop.hbase.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
import org.apache.hadoop.hbase.thrift.CallQueue.Call;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
|
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
||||||
@ -85,9 +94,16 @@ import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
|
|||||||
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
|
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.TScan;
|
import org.apache.hadoop.hbase.thrift.generated.TScan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ConnectionCache;
|
||||||
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
|
import org.apache.hadoop.net.DNS;
|
||||||
|
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
|
import org.apache.thrift.TProcessor;
|
||||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
import org.apache.thrift.protocol.TCompactProtocol;
|
import org.apache.thrift.protocol.TCompactProtocol;
|
||||||
|
import org.apache.thrift.protocol.TProtocol;
|
||||||
import org.apache.thrift.protocol.TProtocolFactory;
|
import org.apache.thrift.protocol.TProtocolFactory;
|
||||||
import org.apache.thrift.server.THsHaServer;
|
import org.apache.thrift.server.THsHaServer;
|
||||||
import org.apache.thrift.server.TNonblockingServer;
|
import org.apache.thrift.server.TNonblockingServer;
|
||||||
@ -96,6 +112,7 @@ import org.apache.thrift.server.TThreadedSelectorServer;
|
|||||||
import org.apache.thrift.transport.TFramedTransport;
|
import org.apache.thrift.transport.TFramedTransport;
|
||||||
import org.apache.thrift.transport.TNonblockingServerSocket;
|
import org.apache.thrift.transport.TNonblockingServerSocket;
|
||||||
import org.apache.thrift.transport.TNonblockingServerTransport;
|
import org.apache.thrift.transport.TNonblockingServerTransport;
|
||||||
|
import org.apache.thrift.transport.TSaslServerTransport;
|
||||||
import org.apache.thrift.transport.TServerSocket;
|
import org.apache.thrift.transport.TServerSocket;
|
||||||
import org.apache.thrift.transport.TServerTransport;
|
import org.apache.thrift.transport.TServerTransport;
|
||||||
import org.apache.thrift.transport.TTransportFactory;
|
import org.apache.thrift.transport.TTransportFactory;
|
||||||
@ -108,6 +125,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|||||||
* the Hbase API specified in the Hbase.thrift IDL file.
|
* the Hbase API specified in the Hbase.thrift IDL file.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public class ThriftServerRunner implements Runnable {
|
public class ThriftServerRunner implements Runnable {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
|
private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
|
||||||
@ -122,6 +140,17 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
||||||
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrift quality of protection configuration key. Valid values can be:
|
||||||
|
* auth-conf: authentication, integrity and confidentiality checking
|
||||||
|
* auth-int: authentication and integrity checking
|
||||||
|
* auth: authentication only
|
||||||
|
*
|
||||||
|
* This is used to authenticate the callers and support impersonation.
|
||||||
|
* The thrift server and the HBase cluster must run in secure mode.
|
||||||
|
*/
|
||||||
|
static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
|
||||||
|
|
||||||
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
|
||||||
public static final int DEFAULT_LISTEN_PORT = 9090;
|
public static final int DEFAULT_LISTEN_PORT = 9090;
|
||||||
private final int listenPort;
|
private final int listenPort;
|
||||||
@ -130,6 +159,11 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
volatile TServer tserver;
|
volatile TServer tserver;
|
||||||
private final Hbase.Iface handler;
|
private final Hbase.Iface handler;
|
||||||
private final ThriftMetrics metrics;
|
private final ThriftMetrics metrics;
|
||||||
|
private final HBaseHandler hbaseHandler;
|
||||||
|
private final UserGroupInformation realUser;
|
||||||
|
|
||||||
|
private final String qop;
|
||||||
|
private String host;
|
||||||
|
|
||||||
/** An enum of server implementation selections */
|
/** An enum of server implementation selections */
|
||||||
enum ImplType {
|
enum ImplType {
|
||||||
@ -230,15 +264,37 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public ThriftServerRunner(Configuration conf) throws IOException {
|
public ThriftServerRunner(Configuration conf) throws IOException {
|
||||||
this(conf, new ThriftServerRunner.HBaseHandler(conf));
|
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||||
}
|
// login the server principal (if using secure Hadoop)
|
||||||
|
boolean securityEnabled = userProvider.isHadoopSecurityEnabled()
|
||||||
public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
|
&& userProvider.isHBaseSecurityEnabled();
|
||||||
|
if (securityEnabled) {
|
||||||
|
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
|
conf.get("hbase.thrift.dns.interface", "default"),
|
||||||
|
conf.get("hbase.thrift.dns.nameserver", "default")));
|
||||||
|
userProvider.login("hbase.thrift.keytab.file",
|
||||||
|
"hbase.thrift.kerberos.principal", host);
|
||||||
|
}
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
|
||||||
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
|
this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
|
||||||
handler.initMetrics(metrics);
|
this.hbaseHandler = new HBaseHandler(conf, userProvider);
|
||||||
this.handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
|
this.hbaseHandler.initMetrics(metrics);
|
||||||
|
this.handler = HbaseHandlerMetricsProxy.newInstance(
|
||||||
|
hbaseHandler, metrics, conf);
|
||||||
|
this.realUser = userProvider.getCurrent().getUGI();
|
||||||
|
qop = conf.get(THRIFT_QOP_KEY);
|
||||||
|
if (qop != null) {
|
||||||
|
if (!qop.equals("auth") && !qop.equals("auth-int")
|
||||||
|
&& !qop.equals("auth-conf")) {
|
||||||
|
throw new IOException("Invalid hbase.thrift.security.qop: " + qop
|
||||||
|
+ ", it must be 'auth', 'auth-int', or 'auth-conf'");
|
||||||
|
}
|
||||||
|
if (!securityEnabled) {
|
||||||
|
throw new IOException("Thrift server must"
|
||||||
|
+ " run in secure mode to support authentication");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -246,14 +302,21 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
realUser.doAs(
|
||||||
setupServer();
|
new PrivilegedAction<Object>() {
|
||||||
tserver.serve();
|
@Override
|
||||||
} catch (Exception e) {
|
public Object run() {
|
||||||
LOG.fatal("Cannot run ThriftServer", e);
|
try {
|
||||||
// Crash the process if the ThriftServer is not running
|
setupServer();
|
||||||
System.exit(-1);
|
tserver.serve();
|
||||||
}
|
} catch (Exception e) {
|
||||||
|
LOG.fatal("Cannot run ThriftServer", e);
|
||||||
|
// Crash the process if the ThriftServer is not running
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
@ -277,18 +340,72 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
protocolFactory = new TBinaryProtocol.Factory();
|
protocolFactory = new TBinaryProtocol.Factory();
|
||||||
}
|
}
|
||||||
|
|
||||||
Hbase.Processor<Hbase.Iface> processor =
|
final TProcessor p = new Hbase.Processor<Hbase.Iface>(handler);
|
||||||
new Hbase.Processor<Hbase.Iface>(handler);
|
|
||||||
ImplType implType = ImplType.getServerImpl(conf);
|
ImplType implType = ImplType.getServerImpl(conf);
|
||||||
|
TProcessor processor = p;
|
||||||
|
|
||||||
// Construct correct TransportFactory
|
// Construct correct TransportFactory
|
||||||
TTransportFactory transportFactory;
|
TTransportFactory transportFactory;
|
||||||
if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
|
if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
|
||||||
|
if (qop != null) {
|
||||||
|
throw new RuntimeException("Thrift server authentication"
|
||||||
|
+ " doesn't work with framed transport yet");
|
||||||
|
}
|
||||||
transportFactory = new TFramedTransport.Factory(
|
transportFactory = new TFramedTransport.Factory(
|
||||||
conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
|
conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2) * 1024 * 1024);
|
||||||
LOG.debug("Using framed transport");
|
LOG.debug("Using framed transport");
|
||||||
} else {
|
} else if (qop == null) {
|
||||||
transportFactory = new TTransportFactory();
|
transportFactory = new TTransportFactory();
|
||||||
|
} else {
|
||||||
|
// Extract the name from the principal
|
||||||
|
String name = SecurityUtil.getUserFromPrincipal(
|
||||||
|
conf.get("hbase.thrift.kerberos.principal"));
|
||||||
|
Map<String, String> saslProperties = new HashMap<String, String>();
|
||||||
|
saslProperties.put(Sasl.QOP, qop);
|
||||||
|
TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
|
||||||
|
saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
|
||||||
|
new SaslGssCallbackHandler() {
|
||||||
|
@Override
|
||||||
|
public void handle(Callback[] callbacks)
|
||||||
|
throws UnsupportedCallbackException {
|
||||||
|
AuthorizeCallback ac = null;
|
||||||
|
for (Callback callback : callbacks) {
|
||||||
|
if (callback instanceof AuthorizeCallback) {
|
||||||
|
ac = (AuthorizeCallback) callback;
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedCallbackException(callback,
|
||||||
|
"Unrecognized SASL GSSAPI Callback");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ac != null) {
|
||||||
|
String authid = ac.getAuthenticationID();
|
||||||
|
String authzid = ac.getAuthorizationID();
|
||||||
|
if (!authid.equals(authzid)) {
|
||||||
|
ac.setAuthorized(false);
|
||||||
|
} else {
|
||||||
|
ac.setAuthorized(true);
|
||||||
|
String userName = SecurityUtil.getUserFromPrincipal(authzid);
|
||||||
|
LOG.info("Effective user: " + userName);
|
||||||
|
ac.setAuthorizedID(userName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
transportFactory = saslFactory;
|
||||||
|
|
||||||
|
// Create a processor wrapper, to get the caller
|
||||||
|
processor = new TProcessor() {
|
||||||
|
@Override
|
||||||
|
public boolean process(TProtocol inProt,
|
||||||
|
TProtocol outProt) throws TException {
|
||||||
|
TSaslServerTransport saslServerTransport =
|
||||||
|
(TSaslServerTransport)inProt.getTransport();
|
||||||
|
SaslServer saslServer = saslServerTransport.getSaslServer();
|
||||||
|
String principal = saslServer.getAuthorizationID();
|
||||||
|
hbaseHandler.setEffectiveUser(principal);
|
||||||
|
return p.process(inProt, outProt);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
|
if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
|
||||||
@ -415,7 +532,6 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
*/
|
*/
|
||||||
public static class HBaseHandler implements Hbase.Iface {
|
public static class HBaseHandler implements Hbase.Iface {
|
||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
protected volatile HBaseAdmin admin = null;
|
|
||||||
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
|
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||||
|
|
||||||
// nextScannerId and scannerMap are used to manage scanner state
|
// nextScannerId and scannerMap are used to manage scanner state
|
||||||
@ -423,6 +539,8 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
|
protected HashMap<Integer, ResultScannerWrapper> scannerMap = null;
|
||||||
private ThriftMetrics metrics = null;
|
private ThriftMetrics metrics = null;
|
||||||
|
|
||||||
|
private final ConnectionCache connectionCache;
|
||||||
|
|
||||||
private static ThreadLocal<Map<String, HTable>> threadLocalTables =
|
private static ThreadLocal<Map<String, HTable>> threadLocalTables =
|
||||||
new ThreadLocal<Map<String, HTable>>() {
|
new ThreadLocal<Map<String, HTable>>() {
|
||||||
@Override
|
@Override
|
||||||
@ -433,6 +551,9 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
|
|
||||||
IncrementCoalescer coalescer = null;
|
IncrementCoalescer coalescer = null;
|
||||||
|
|
||||||
|
static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
|
||||||
|
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a list of all the column families for a given htable.
|
* Returns a list of all the column families for a given htable.
|
||||||
*
|
*
|
||||||
@ -460,10 +581,10 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
*/
|
*/
|
||||||
public HTable getTable(final byte[] tableName) throws
|
public HTable getTable(final byte[] tableName) throws
|
||||||
IOException {
|
IOException {
|
||||||
String table = new String(tableName);
|
String table = Bytes.toString(tableName);
|
||||||
Map<String, HTable> tables = threadLocalTables.get();
|
Map<String, HTable> tables = threadLocalTables.get();
|
||||||
if (!tables.containsKey(table)) {
|
if (!tables.containsKey(table)) {
|
||||||
tables.put(table, new HTable(conf, tableName));
|
tables.put(table, (HTable)connectionCache.getTable(table));
|
||||||
}
|
}
|
||||||
return tables.get(table);
|
return tables.get(table);
|
||||||
}
|
}
|
||||||
@ -507,33 +628,27 @@ public class ThriftServerRunner implements Runnable {
|
|||||||
return scannerMap.remove(id);
|
return scannerMap.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected HBaseHandler(final Configuration c,
|
||||||
* Constructs an HBaseHandler object.
|
final UserProvider userProvider) throws IOException {
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
protected HBaseHandler()
|
|
||||||
throws IOException {
|
|
||||||
this(HBaseConfiguration.create());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected HBaseHandler(final Configuration c) throws IOException {
|
|
||||||
this.conf = c;
|
this.conf = c;
|
||||||
scannerMap = new HashMap<Integer, ResultScannerWrapper>();
|
scannerMap = new HashMap<Integer, ResultScannerWrapper>();
|
||||||
this.coalescer = new IncrementCoalescer(this);
|
this.coalescer = new IncrementCoalescer(this);
|
||||||
|
|
||||||
|
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
|
||||||
|
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
|
||||||
|
connectionCache = new ConnectionCache(
|
||||||
|
conf, userProvider, cleanInterval, maxIdleTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain HBaseAdmin. Creates the instance if it is not already created.
|
* Obtain HBaseAdmin. Creates the instance if it is not already created.
|
||||||
*/
|
*/
|
||||||
private HBaseAdmin getHBaseAdmin() throws IOException {
|
private HBaseAdmin getHBaseAdmin() throws IOException {
|
||||||
if (admin == null) {
|
return connectionCache.getAdmin();
|
||||||
synchronized (this) {
|
}
|
||||||
if (admin == null) {
|
|
||||||
admin = new HBaseAdmin(conf);
|
void setEffectiveUser(String effectiveUser) {
|
||||||
}
|
connectionCache.setEffectiveUser(effectiveUser);
|
||||||
}
|
|
||||||
}
|
|
||||||
return admin;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||||||
import org.apache.hadoop.hbase.LargeTests;
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
|
import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
|
||||||
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
|
||||||
@ -137,7 +138,8 @@ public class TestThriftServer {
|
|||||||
*/
|
*/
|
||||||
public void doTestTableCreateDrop() throws Exception {
|
public void doTestTableCreateDrop() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
doTestTableCreateDrop(handler);
|
doTestTableCreateDrop(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -151,7 +153,7 @@ public class TestThriftServer {
|
|||||||
|
|
||||||
protected MySlowHBaseHandler(Configuration c)
|
protected MySlowHBaseHandler(Configuration c)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super(c);
|
super(c, UserProvider.instantiate(c));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -256,7 +258,8 @@ public class TestThriftServer {
|
|||||||
|
|
||||||
public void doTestIncrements() throws Exception {
|
public void doTestIncrements() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
createTestTables(handler);
|
createTestTables(handler);
|
||||||
doTestIncrements(handler);
|
doTestIncrements(handler);
|
||||||
dropTestTables(handler);
|
dropTestTables(handler);
|
||||||
@ -303,7 +306,8 @@ public class TestThriftServer {
|
|||||||
*/
|
*/
|
||||||
public void doTestTableMutations() throws Exception {
|
public void doTestTableMutations() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
doTestTableMutations(handler);
|
doTestTableMutations(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -381,7 +385,8 @@ public class TestThriftServer {
|
|||||||
public void doTestTableTimestampsAndColumns() throws Exception {
|
public void doTestTableTimestampsAndColumns() throws Exception {
|
||||||
// Setup
|
// Setup
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
handler.createTable(tableAname, getColumnDescriptors());
|
handler.createTable(tableAname, getColumnDescriptors());
|
||||||
|
|
||||||
// Apply timestamped Mutations to rowA
|
// Apply timestamped Mutations to rowA
|
||||||
@ -460,7 +465,8 @@ public class TestThriftServer {
|
|||||||
public void doTestTableScanners() throws Exception {
|
public void doTestTableScanners() throws Exception {
|
||||||
// Setup
|
// Setup
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
handler.createTable(tableAname, getColumnDescriptors());
|
handler.createTable(tableAname, getColumnDescriptors());
|
||||||
|
|
||||||
// Apply timestamped Mutations to rowA
|
// Apply timestamped Mutations to rowA
|
||||||
@ -579,7 +585,8 @@ public class TestThriftServer {
|
|||||||
*/
|
*/
|
||||||
public void doTestGetTableRegions() throws Exception {
|
public void doTestGetTableRegions() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
doTestGetTableRegions(handler);
|
doTestGetTableRegions(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -614,7 +621,8 @@ public class TestThriftServer {
|
|||||||
|
|
||||||
public void doTestGetRegionInfo() throws Exception {
|
public void doTestGetRegionInfo() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
doTestGetRegionInfo(handler);
|
doTestGetRegionInfo(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -642,7 +650,8 @@ public class TestThriftServer {
|
|||||||
*/
|
*/
|
||||||
public static void doTestAppend() throws Exception {
|
public static void doTestAppend() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
handler.createTable(tableAname, getColumnDescriptors());
|
handler.createTable(tableAname, getColumnDescriptors());
|
||||||
try {
|
try {
|
||||||
List<Mutation> mutations = new ArrayList<Mutation>(1);
|
List<Mutation> mutations = new ArrayList<Mutation>(1);
|
||||||
@ -675,7 +684,8 @@ public class TestThriftServer {
|
|||||||
*/
|
*/
|
||||||
public static void doTestCheckAndPut() throws Exception {
|
public static void doTestCheckAndPut() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
|
new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration(),
|
||||||
|
UserProvider.instantiate(UTIL.getConfiguration()));
|
||||||
handler.createTable(tableAname, getColumnDescriptors());
|
handler.createTable(tableAname, getColumnDescriptors());
|
||||||
try {
|
try {
|
||||||
List<Mutation> mutations = new ArrayList<Mutation>(1);
|
List<Mutation> mutations = new ArrayList<Mutation>(1);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user