diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java index 0efb4029be5..44197dbc363 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/User.java @@ -24,7 +24,8 @@ import java.lang.reflect.UndeclaredThrowableException; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Collection; - +import java.util.concurrent.ExecutionException; +import com.google.common.cache.LoadingCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -278,15 +279,25 @@ public abstract class User { * {@link org.apache.hadoop.security.UserGroupInformation} for secure Hadoop * 0.20 and versions 0.21 and above. */ - private static class SecureHadoopUser extends User { + @InterfaceAudience.Private + public static final class SecureHadoopUser extends User { private String shortName; + private LoadingCache cache; - private SecureHadoopUser() throws IOException { + public SecureHadoopUser() throws IOException { ugi = UserGroupInformation.getCurrentUser(); + this.cache = null; } - private SecureHadoopUser(UserGroupInformation ugi) { + public SecureHadoopUser(UserGroupInformation ugi) { this.ugi = ugi; + this.cache = null; + } + + public SecureHadoopUser(UserGroupInformation ugi, + LoadingCache cache) { + this.ugi = ugi; + this.cache = cache; } @Override @@ -301,6 +312,18 @@ public abstract class User { } } + @Override + public String[] getGroupNames() { + if (cache != null) { + try { + return this.cache.get(ugi); + } catch (ExecutionException e) { + return new String[0]; + } + } + return ugi.getGroupNames(); + } + @Override public T runAs(PrivilegedAction action) { return ugi.doAs(action); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java index 66df64580ab..33b8a94d05b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/security/UserProvider.java @@ -18,9 +18,20 @@ package org.apache.hadoop.hbase.security; import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hbase.BaseConfigurable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; @@ -32,6 +43,50 @@ import org.apache.hadoop.util.ReflectionUtils; public class UserProvider extends BaseConfigurable { private static final String USER_PROVIDER_CONF_KEY = "hbase.client.userprovider.class"; + private static final ListeningExecutorService executor = MoreExecutors.listeningDecorator( + Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("group-cache-%d").build())); + + private LoadingCache groupCache = null; + + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + long cacheTimeout = + getConf().getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, + CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000; + + this.groupCache = CacheBuilder.newBuilder() + // This is the same timeout that hadoop uses. So we'll follow suit. + .refreshAfterWrite(cacheTimeout, TimeUnit.MILLISECONDS) + .expireAfterWrite(10 * cacheTimeout, TimeUnit.MILLISECONDS) + // Set concurrency level equal to the default number of handlers that + // the simple handler spins up. + .concurrencyLevel(20) + // create the loader + // This just delegates to UGI. + .build(new CacheLoader() { + @Override + public String[] load(UserGroupInformation ugi) throws Exception { + return ugi.getGroupNames(); + } + + // Provide the reload function that uses the executor thread. + public ListenableFuture reload(final UserGroupInformation k, + String[] oldValue) throws Exception { + + return executor.submit(new Callable() { + UserGroupInformation userGroupInformation = k; + @Override + public String[] call() throws Exception { + return userGroupInformation.getGroupNames(); + } + }); + } + }); + } /** * Instantiate the {@link UserProvider} specified in the configuration and set the passed @@ -94,7 +149,10 @@ public class UserProvider extends BaseConfigurable { * @return User */ public User create(UserGroupInformation ugi) { - return User.create(ugi); + if (ugi == null) { + return null; + } + return new User.SecureHadoopUser(ugi, groupCache); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 26ffa9579fa..38b7c91692f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -85,7 +85,7 @@ public class CallRunner { this.status.setStatus("Setting up call"); this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort()); if (RpcServer.LOG.isTraceEnabled()) { - UserGroupInformation remoteUser = call.connection.user; + UserGroupInformation remoteUser = call.connection.ugi; RpcServer.LOG.trace(call.toShortString() + " executing as " + ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName())); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 0ee93eb2a4b..7c239f5b31e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -324,7 +324,7 @@ public class RpcServer implements RpcServerInterface { this.isError = false; this.size = size; this.tinfo = tinfo; - this.user = connection.user == null? null: userProvider.create(connection.user); + this.user = connection.user; this.remoteAddress = remoteAddress; } @@ -531,7 +531,7 @@ public class RpcServer implements RpcServerInterface { } public UserGroupInformation getRemoteUser() { - return connection.user; + return connection.ugi; } @Override @@ -1214,7 +1214,7 @@ public class RpcServer implements RpcServerInterface { */ private CompressionCodec compressionCodec; BlockingService service; - protected UserGroupInformation user = null; + private AuthMethod authMethod; private boolean saslContextEstablished; private boolean skipInitialSaslHandshake; @@ -1237,6 +1237,8 @@ public class RpcServer implements RpcServerInterface { new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null); public UserGroupInformation attemptingUser = null; // user name before auth + protected User user = null; + protected UserGroupInformation ugi = null; public Connection(SocketChannel channel, long lastContact) { this.channel = channel; @@ -1417,14 +1419,14 @@ public class RpcServer implements RpcServerInterface { if (saslServer.isComplete()) { String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP); useWrap = qop != null && !"auth".equalsIgnoreCase(qop); - user = getAuthorizedUgi(saslServer.getAuthorizationID()); + ugi = getAuthorizedUgi(saslServer.getAuthorizationID()); if (LOG.isDebugEnabled()) { LOG.debug("SASL server context established. Authenticated client: " - + user + ". Negotiated QoP is " + + ugi + ". Negotiated QoP is " + saslServer.getNegotiatedProperty(Sasl.QOP)); } metrics.authenticationSuccess(); - AUDITLOG.info(AUTH_SUCCESSFUL_FOR + user); + AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi); saslContextEstablished = true; } } @@ -1647,32 +1649,32 @@ public class RpcServer implements RpcServerInterface { setupCellBlockCodecs(this.connectionHeader); UserGroupInformation protocolUser = createUser(connectionHeader); if (!useSasl) { - user = protocolUser; - if (user != null) { - user.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); + ugi = protocolUser; + if (ugi != null) { + ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod); } } else { // user is authenticated - user.setAuthenticationMethod(authMethod.authenticationMethod); + ugi.setAuthenticationMethod(authMethod.authenticationMethod); //Now we check if this is a proxy user case. If the protocol user is //different from the 'user', it is a proxy user scenario. However, //this is not allowed if user authenticated with DIGEST. if ((protocolUser != null) - && (!protocolUser.getUserName().equals(user.getUserName()))) { + && (!protocolUser.getUserName().equals(ugi.getUserName()))) { if (authMethod == AuthMethod.DIGEST) { // Not allowed to doAs if token authentication is used - throw new AccessDeniedException("Authenticated user (" + user + throw new AccessDeniedException("Authenticated user (" + ugi + ") doesn't match what the client claims to be (" + protocolUser + ")"); } else { // Effective user can be different from authenticated user // for simple auth or kerberos auth // The user is the real user. Now we create a proxy user - UserGroupInformation realUser = user; - user = UserGroupInformation.createProxyUser(protocolUser + UserGroupInformation realUser = ugi; + ugi = UserGroupInformation.createProxyUser(protocolUser .getUserName(), realUser); // Now the user is a proxy user, set Authentication method Proxy. - user.setAuthenticationMethod(AuthenticationMethod.PROXY); + ugi.setAuthenticationMethod(AuthenticationMethod.PROXY); } } } @@ -1758,8 +1760,9 @@ public class RpcServer implements RpcServerInterface { // Throw FatalConnectionException wrapping ACE so client does right thing and closes // down the connection instead of trying to read non-existent retun. throw new AccessDeniedException("Connection from " + this + " for service " + - connectionHeader.getServiceName() + " is unauthorized for user: " + user); + connectionHeader.getServiceName() + " is unauthorized for user: " + ugi); } + this.user = userProvider.create(this.ugi); } } @@ -1862,11 +1865,11 @@ public class RpcServer implements RpcServerInterface { // real user for the effective user, therefore not required to // authorize real user. doAs is allowed only for simple or kerberos // authentication - if (user != null && user.getRealUser() != null + if (ugi != null && ugi.getRealUser() != null && (authMethod != AuthMethod.DIGEST)) { - ProxyUsers.authorize(user, this.getHostAddress(), conf); + ProxyUsers.authorize(ugi, this.getHostAddress(), conf); } - authorize(user, connectionHeader, getHostInetAddress()); + authorize(ugi, connectionHeader, getHostInetAddress()); metrics.authorizationSuccess(); } catch (AuthorizationException ae) { if (LOG.isDebugEnabled()) {