HBASE-14512 Cache UGI groups
Summary: Don't create a user for every call. Instead create one user per connection. Then inside of SecureHadoopUser cache the groupNames. This allows HBase to cache empty groups. This is needed since it's much more likely that HBase will be accessed by a user that's not present on the posix /etc/group file. Test Plan: Unit Tests Differential Revision: https://reviews.facebook.net/D47751
This commit is contained in:
parent
4812d9a178
commit
cca2ba4d93
|
@ -24,7 +24,8 @@ import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
@ -278,15 +279,25 @@ public abstract class User {
|
||||||
* {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
|
* {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop
|
||||||
* 0.20 and versions 0.21 and above.
|
* 0.20 and versions 0.21 and above.
|
||||||
*/
|
*/
|
||||||
private static class SecureHadoopUser extends User {
|
@InterfaceAudience.Private
|
||||||
|
public static final class SecureHadoopUser extends User {
|
||||||
private String shortName;
|
private String shortName;
|
||||||
|
private LoadingCache<UserGroupInformation, String[]> cache;
|
||||||
|
|
||||||
private SecureHadoopUser() throws IOException {
|
public SecureHadoopUser() throws IOException {
|
||||||
ugi = UserGroupInformation.getCurrentUser();
|
ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
this.cache = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SecureHadoopUser(UserGroupInformation ugi) {
|
public SecureHadoopUser(UserGroupInformation ugi) {
|
||||||
this.ugi = ugi;
|
this.ugi = ugi;
|
||||||
|
this.cache = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SecureHadoopUser(UserGroupInformation ugi,
|
||||||
|
LoadingCache<UserGroupInformation, String[]> cache) {
|
||||||
|
this.ugi = ugi;
|
||||||
|
this.cache = cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -301,6 +312,18 @@ public abstract class User {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getGroupNames() {
|
||||||
|
if (cache != null) {
|
||||||
|
try {
|
||||||
|
return this.cache.get(ugi);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ugi.getGroupNames();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> T runAs(PrivilegedAction<T> action) {
|
public <T> T runAs(PrivilegedAction<T> action) {
|
||||||
return ugi.doAs(action);
|
return ugi.doAs(action);
|
||||||
|
|
|
@ -18,9 +18,20 @@
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import com.google.common.cache.CacheBuilder;
|
||||||
|
import com.google.common.cache.CacheLoader;
|
||||||
|
import com.google.common.cache.LoadingCache;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.BaseConfigurable;
|
import org.apache.hadoop.hbase.BaseConfigurable;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
@ -32,6 +43,50 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
public class UserProvider extends BaseConfigurable {
|
public class UserProvider extends BaseConfigurable {
|
||||||
|
|
||||||
private static final String USER_PROVIDER_CONF_KEY = "hbase.client.userprovider.class";
|
private static final String USER_PROVIDER_CONF_KEY = "hbase.client.userprovider.class";
|
||||||
|
private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator(
|
||||||
|
Executors.newScheduledThreadPool(
|
||||||
|
1,
|
||||||
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("group-cache-%d").build()));
|
||||||
|
|
||||||
|
private LoadingCache<UserGroupInformation, String[]> groupCache = null;
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
super.setConf(conf);
|
||||||
|
long cacheTimeout =
|
||||||
|
getConf().getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
|
||||||
|
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
|
||||||
|
|
||||||
|
this.groupCache = CacheBuilder.newBuilder()
|
||||||
|
// This is the same timeout that hadoop uses. So we'll follow suit.
|
||||||
|
.refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS)
|
||||||
|
.expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS)
|
||||||
|
// Set concurrency level equal to the default number of handlers that
|
||||||
|
// the simple handler spins up.
|
||||||
|
.concurrencyLevel(20)
|
||||||
|
// create the loader
|
||||||
|
// This just delegates to UGI.
|
||||||
|
.build(new CacheLoader<UserGroupInformation, String[]>() {
|
||||||
|
@Override
|
||||||
|
public String[] load(UserGroupInformation ugi) throws Exception {
|
||||||
|
return ugi.getGroupNames();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Provide the reload function that uses the executor thread.
|
||||||
|
public ListenableFuture<String[]> reload(final UserGroupInformation k,
|
||||||
|
String[] oldValue) throws Exception {
|
||||||
|
|
||||||
|
return executor.submit(new Callable<String[]>() {
|
||||||
|
UserGroupInformation userGroupInformation = k;
|
||||||
|
@Override
|
||||||
|
public String[] call() throws Exception {
|
||||||
|
return userGroupInformation.getGroupNames();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate the {@link UserProvider} specified in the configuration and set the passed
|
* Instantiate the {@link UserProvider} specified in the configuration and set the passed
|
||||||
|
@ -94,7 +149,10 @@ public class UserProvider extends BaseConfigurable {
|
||||||
* @return User
|
* @return User
|
||||||
*/
|
*/
|
||||||
public User create(UserGroupInformation ugi) {
|
public User create(UserGroupInformation ugi) {
|
||||||
return User.create(ugi);
|
if (ugi == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new User.SecureHadoopUser(ugi, groupCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -85,7 +85,7 @@ public class CallRunner {
|
||||||
this.status.setStatus("Setting up call");
|
this.status.setStatus("Setting up call");
|
||||||
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
|
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
|
||||||
if (RpcServer.LOG.isTraceEnabled()) {
|
if (RpcServer.LOG.isTraceEnabled()) {
|
||||||
UserGroupInformation remoteUser = call.connection.user;
|
UserGroupInformation remoteUser = call.connection.ugi;
|
||||||
RpcServer.LOG.trace(call.toShortString() + " executing as " +
|
RpcServer.LOG.trace(call.toShortString() + " executing as " +
|
||||||
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
|
((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -324,7 +324,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
this.isError = false;
|
this.isError = false;
|
||||||
this.size = size;
|
this.size = size;
|
||||||
this.tinfo = tinfo;
|
this.tinfo = tinfo;
|
||||||
this.user = connection.user == null? null: userProvider.create(connection.user);
|
this.user = connection.user;
|
||||||
this.remoteAddress = remoteAddress;
|
this.remoteAddress = remoteAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -531,7 +531,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
public UserGroupInformation getRemoteUser() {
|
public UserGroupInformation getRemoteUser() {
|
||||||
return connection.user;
|
return connection.ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1214,7 +1214,7 @@ public class RpcServer implements RpcServerInterface {
|
||||||
*/
|
*/
|
||||||
private CompressionCodec compressionCodec;
|
private CompressionCodec compressionCodec;
|
||||||
BlockingService service;
|
BlockingService service;
|
||||||
protected UserGroupInformation user = null;
|
|
||||||
private AuthMethod authMethod;
|
private AuthMethod authMethod;
|
||||||
private boolean saslContextEstablished;
|
private boolean saslContextEstablished;
|
||||||
private boolean skipInitialSaslHandshake;
|
private boolean skipInitialSaslHandshake;
|
||||||
|
@ -1237,6 +1237,8 @@ public class RpcServer implements RpcServerInterface {
|
||||||
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
|
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null);
|
||||||
|
|
||||||
public UserGroupInformation attemptingUser = null; // user name before auth
|
public UserGroupInformation attemptingUser = null; // user name before auth
|
||||||
|
protected User user = null;
|
||||||
|
protected UserGroupInformation ugi = null;
|
||||||
|
|
||||||
public Connection(SocketChannel channel, long lastContact) {
|
public Connection(SocketChannel channel, long lastContact) {
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
|
@ -1417,14 +1419,14 @@ public class RpcServer implements RpcServerInterface {
|
||||||
if (saslServer.isComplete()) {
|
if (saslServer.isComplete()) {
|
||||||
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
|
||||||
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
|
||||||
user = getAuthorizedUgi(saslServer.getAuthorizationID());
|
ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("SASL server context established. Authenticated client: "
|
LOG.debug("SASL server context established. Authenticated client: "
|
||||||
+ user + ". Negotiated QoP is "
|
+ ugi + ". Negotiated QoP is "
|
||||||
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
+ saslServer.getNegotiatedProperty(Sasl.QOP));
|
||||||
}
|
}
|
||||||
metrics.authenticationSuccess();
|
metrics.authenticationSuccess();
|
||||||
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user);
|
AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
|
||||||
saslContextEstablished = true;
|
saslContextEstablished = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1647,32 +1649,32 @@ public class RpcServer implements RpcServerInterface {
|
||||||
setupCellBlockCodecs(this.connectionHeader);
|
setupCellBlockCodecs(this.connectionHeader);
|
||||||
UserGroupInformation protocolUser = createUser(connectionHeader);
|
UserGroupInformation protocolUser = createUser(connectionHeader);
|
||||||
if (!useSasl) {
|
if (!useSasl) {
|
||||||
user = protocolUser;
|
ugi = protocolUser;
|
||||||
if (user != null) {
|
if (ugi != null) {
|
||||||
user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
|
ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// user is authenticated
|
// user is authenticated
|
||||||
user.setAuthenticationMethod(authMethod.authenticationMethod);
|
ugi.setAuthenticationMethod(authMethod.authenticationMethod);
|
||||||
//Now we check if this is a proxy user case. If the protocol user is
|
//Now we check if this is a proxy user case. If the protocol user is
|
||||||
//different from the 'user', it is a proxy user scenario. However,
|
//different from the 'user', it is a proxy user scenario. However,
|
||||||
//this is not allowed if user authenticated with DIGEST.
|
//this is not allowed if user authenticated with DIGEST.
|
||||||
if ((protocolUser != null)
|
if ((protocolUser != null)
|
||||||
&& (!protocolUser.getUserName().equals(user.getUserName()))) {
|
&& (!protocolUser.getUserName().equals(ugi.getUserName()))) {
|
||||||
if (authMethod == AuthMethod.DIGEST) {
|
if (authMethod == AuthMethod.DIGEST) {
|
||||||
// Not allowed to doAs if token authentication is used
|
// Not allowed to doAs if token authentication is used
|
||||||
throw new AccessDeniedException("Authenticated user (" + user
|
throw new AccessDeniedException("Authenticated user (" + ugi
|
||||||
+ ") doesn't match what the client claims to be ("
|
+ ") doesn't match what the client claims to be ("
|
||||||
+ protocolUser + ")");
|
+ protocolUser + ")");
|
||||||
} else {
|
} else {
|
||||||
// Effective user can be different from authenticated user
|
// Effective user can be different from authenticated user
|
||||||
// for simple auth or kerberos auth
|
// for simple auth or kerberos auth
|
||||||
// The user is the real user. Now we create a proxy user
|
// The user is the real user. Now we create a proxy user
|
||||||
UserGroupInformation realUser = user;
|
UserGroupInformation realUser = ugi;
|
||||||
user = UserGroupInformation.createProxyUser(protocolUser
|
ugi = UserGroupInformation.createProxyUser(protocolUser
|
||||||
.getUserName(), realUser);
|
.getUserName(), realUser);
|
||||||
// Now the user is a proxy user, set Authentication method Proxy.
|
// Now the user is a proxy user, set Authentication method Proxy.
|
||||||
user.setAuthenticationMethod(AuthenticationMethod.PROXY);
|
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1758,8 +1760,9 @@ public class RpcServer implements RpcServerInterface {
|
||||||
// Throw FatalConnectionException wrapping ACE so client does right thing and closes
|
// Throw FatalConnectionException wrapping ACE so client does right thing and closes
|
||||||
// down the connection instead of trying to read non-existent retun.
|
// down the connection instead of trying to read non-existent retun.
|
||||||
throw new AccessDeniedException("Connection from " + this + " for service " +
|
throw new AccessDeniedException("Connection from " + this + " for service " +
|
||||||
connectionHeader.getServiceName() + " is unauthorized for user: " + user);
|
connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
|
||||||
}
|
}
|
||||||
|
this.user = userProvider.create(this.ugi);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1862,11 +1865,11 @@ public class RpcServer implements RpcServerInterface {
|
||||||
// real user for the effective user, therefore not required to
|
// real user for the effective user, therefore not required to
|
||||||
// authorize real user. doAs is allowed only for simple or kerberos
|
// authorize real user. doAs is allowed only for simple or kerberos
|
||||||
// authentication
|
// authentication
|
||||||
if (user != null && user.getRealUser() != null
|
if (ugi != null && ugi.getRealUser() != null
|
||||||
&& (authMethod != AuthMethod.DIGEST)) {
|
&& (authMethod != AuthMethod.DIGEST)) {
|
||||||
ProxyUsers.authorize(user, this.getHostAddress(), conf);
|
ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
|
||||||
}
|
}
|
||||||
authorize(user, connectionHeader, getHostInetAddress());
|
authorize(ugi, connectionHeader, getHostInetAddress());
|
||||||
metrics.authorizationSuccess();
|
metrics.authorizationSuccess();
|
||||||
} catch (AuthorizationException ae) {
|
} catch (AuthorizationException ae) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
|
Loading…
Reference in New Issue