HBASE-14597 Fix Groups cache in multi-threaded env
This commit is contained in:
parent
2c301d535b
commit
f4a4c3a489
|
@ -23,7 +23,11 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -32,6 +36,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.util.Methods;
|
import org.apache.hadoop.hbase.util.Methods;
|
||||||
import org.apache.hadoop.mapred.JobConf;
|
import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
@ -282,7 +287,7 @@ public abstract class User {
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public static final class SecureHadoopUser extends User {
|
public static final class SecureHadoopUser extends User {
|
||||||
private String shortName;
|
private String shortName;
|
||||||
private LoadingCache<UserGroupInformation, String[]> cache;
|
private LoadingCache<String, String[]> cache;
|
||||||
|
|
||||||
public SecureHadoopUser() throws IOException {
|
public SecureHadoopUser() throws IOException {
|
||||||
ugi = UserGroupInformation.getCurrentUser();
|
ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
@ -295,7 +300,7 @@ public abstract class User {
|
||||||
}
|
}
|
||||||
|
|
||||||
public SecureHadoopUser(UserGroupInformation ugi,
|
public SecureHadoopUser(UserGroupInformation ugi,
|
||||||
LoadingCache<UserGroupInformation, String[]> cache) {
|
LoadingCache<String, String[]> cache) {
|
||||||
this.ugi = ugi;
|
this.ugi = ugi;
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
}
|
}
|
||||||
|
@ -316,7 +321,7 @@ public abstract class User {
|
||||||
public String[] getGroupNames() {
|
public String[] getGroupNames() {
|
||||||
if (cache != null) {
|
if (cache != null) {
|
||||||
try {
|
try {
|
||||||
return this.cache.get(ugi);
|
return this.cache.get(getShortName());
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
return new String[0];
|
return new String[0];
|
||||||
}
|
}
|
||||||
|
@ -387,6 +392,13 @@ public abstract class User {
|
||||||
/** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
|
/** @see User#createUserForTesting(org.apache.hadoop.conf.Configuration, String, String[]) */
|
||||||
public static User createUserForTesting(Configuration conf,
|
public static User createUserForTesting(Configuration conf,
|
||||||
String name, String[] groups) {
|
String name, String[] groups) {
|
||||||
|
synchronized (UserProvider.class) {
|
||||||
|
if (!(UserProvider.groups instanceof TestingGroups)) {
|
||||||
|
UserProvider.groups = new TestingGroups(UserProvider.groups);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
((TestingGroups)UserProvider.groups).setUserGroups(name, groups);
|
||||||
return new SecureHadoopUser(UserGroupInformation.createUserForTesting(name, groups));
|
return new SecureHadoopUser(UserGroupInformation.createUserForTesting(name, groups));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,4 +428,30 @@ public abstract class User {
|
||||||
return UserGroupInformation.isSecurityEnabled();
|
return UserGroupInformation.isSecurityEnabled();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class TestingGroups extends Groups {
|
||||||
|
private final Map<String, List<String>> userToGroupsMapping =
|
||||||
|
new HashMap<String,List<String>>();
|
||||||
|
private Groups underlyingImplementation;
|
||||||
|
|
||||||
|
TestingGroups(Groups underlyingImplementation) {
|
||||||
|
super(new Configuration());
|
||||||
|
this.underlyingImplementation = underlyingImplementation;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<String> getGroups(String user) throws IOException {
|
||||||
|
List<String> result = userToGroupsMapping.get(user);
|
||||||
|
|
||||||
|
if (result == null) {
|
||||||
|
result = underlyingImplementation.getGroups(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setUserGroups(String user, String[] groups) {
|
||||||
|
userToGroupsMapping.put(user, Arrays.asList(groups));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -33,6 +35,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.BaseConfigurable;
|
import org.apache.hadoop.hbase.BaseConfigurable;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
@ -48,12 +51,20 @@ public class UserProvider extends BaseConfigurable {
|
||||||
1,
|
1,
|
||||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("group-cache-%d").build()));
|
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("group-cache-%d").build()));
|
||||||
|
|
||||||
private LoadingCache<UserGroupInformation, String[]> groupCache = null;
|
private LoadingCache<String, String[]> groupCache = null;
|
||||||
|
|
||||||
|
static Groups groups = Groups.getUserToGroupsMappingService();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConf(Configuration conf) {
|
public void setConf(final Configuration conf) {
|
||||||
super.setConf(conf);
|
super.setConf(conf);
|
||||||
|
|
||||||
|
synchronized (UserProvider.class) {
|
||||||
|
if (!(groups instanceof User.TestingGroups)) {
|
||||||
|
groups = Groups.getUserToGroupsMappingService(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long cacheTimeout =
|
long cacheTimeout =
|
||||||
getConf().getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
|
getConf().getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS,
|
||||||
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
|
CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS_DEFAULT) * 1000;
|
||||||
|
@ -67,21 +78,34 @@ public class UserProvider extends BaseConfigurable {
|
||||||
.concurrencyLevel(20)
|
.concurrencyLevel(20)
|
||||||
// create the loader
|
// create the loader
|
||||||
// This just delegates to UGI.
|
// This just delegates to UGI.
|
||||||
.build(new CacheLoader<UserGroupInformation, String[]>() {
|
.build(new CacheLoader<String, String[]>() {
|
||||||
|
|
||||||
|
// Since UGI's don't hash based on the user id
|
||||||
|
// The cache needs to be keyed on the same thing that Hadoop's Groups class
|
||||||
|
// uses. So this cache uses shortname.
|
||||||
@Override
|
@Override
|
||||||
public String[] load(UserGroupInformation ugi) throws Exception {
|
public String[] load(String ugi) throws Exception {
|
||||||
return ugi.getGroupNames();
|
return getGroupStrings(ugi);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String[] getGroupStrings(String ugi) {
|
||||||
|
try {
|
||||||
|
Set<String> result = new LinkedHashSet<String>(groups.getGroups(ugi));
|
||||||
|
return result.toArray(new String[result.size()]);
|
||||||
|
} catch (Exception e) {
|
||||||
|
return new String[0];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Provide the reload function that uses the executor thread.
|
// Provide the reload function that uses the executor thread.
|
||||||
public ListenableFuture<String[]> reload(final UserGroupInformation k,
|
public ListenableFuture<String[]> reload(final String k,
|
||||||
String[] oldValue) throws Exception {
|
String[] oldValue) throws Exception {
|
||||||
|
|
||||||
return executor.submit(new Callable<String[]>() {
|
return executor.submit(new Callable<String[]>() {
|
||||||
UserGroupInformation userGroupInformation = k;
|
|
||||||
@Override
|
@Override
|
||||||
public String[] call() throws Exception {
|
public String[] call() throws Exception {
|
||||||
return userGroupInformation.getGroupNames();
|
return getGroupStrings(k);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,30 +18,94 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.security;
|
package org.apache.hadoop.hbase.security;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
@Category(SmallTests.class)
|
@Category(SmallTests.class)
|
||||||
public class TestUser {
|
public class TestUser {
|
||||||
private static final Log LOG = LogFactory.getLog(TestUser.class);
|
private static final Log LOG = LogFactory.getLog(TestUser.class);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateUserForTestingGroupCache() throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
User uCreated = User.createUserForTesting(conf, "group_user", new String[] { "MYGROUP" });
|
||||||
|
UserProvider up = UserProvider.instantiate(conf);
|
||||||
|
User uProvided = up.create(UserGroupInformation.createRemoteUser("group_user"));
|
||||||
|
assertArrayEquals(uCreated.getGroupNames(), uProvided.getGroupNames());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheGetGroups() throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
UserProvider up = UserProvider.instantiate(conf);
|
||||||
|
|
||||||
|
// VERY unlikely that this user will exist on the box.
|
||||||
|
// This should mean the user has no groups.
|
||||||
|
String nonUser = "kklvfnvhdhcenfnniilggljhdecjhidkle";
|
||||||
|
|
||||||
|
// Create two UGI's for this username
|
||||||
|
UserGroupInformation ugiOne = UserGroupInformation.createRemoteUser(nonUser);
|
||||||
|
UserGroupInformation ugiTwo = UserGroupInformation.createRemoteUser(nonUser);
|
||||||
|
|
||||||
|
// Now try and get the user twice.
|
||||||
|
User uOne = up.create(ugiOne);
|
||||||
|
User uTwo = up.create(ugiTwo);
|
||||||
|
|
||||||
|
// Make sure that we didn't break groups and everything worked well.
|
||||||
|
assertArrayEquals(uOne.getGroupNames(),uTwo.getGroupNames());
|
||||||
|
|
||||||
|
// Check that they are referentially equal.
|
||||||
|
// Since getting a group for a users that doesn't exist creates a new string array
|
||||||
|
// the only way that they should be referentially equal is if the cache worked and
|
||||||
|
// made sure we didn't go to hadoop's script twice.
|
||||||
|
assertTrue(uOne.getGroupNames() == uTwo.getGroupNames());
|
||||||
|
assertEquals(0, ugiOne.getGroupNames().length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCacheGetGroupsRoot() throws Exception {
|
||||||
|
// Windows users don't have a root user.
|
||||||
|
// However pretty much every other *NIX os will have root.
|
||||||
|
if (!SystemUtils.IS_OS_WINDOWS) {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
UserProvider up = UserProvider.instantiate(conf);
|
||||||
|
|
||||||
|
|
||||||
|
String rootUserName = "root";
|
||||||
|
|
||||||
|
// Create two UGI's for this username
|
||||||
|
UserGroupInformation ugiOne = UserGroupInformation.createRemoteUser(rootUserName);
|
||||||
|
UserGroupInformation ugiTwo = UserGroupInformation.createRemoteUser(rootUserName);
|
||||||
|
|
||||||
|
// Now try and get the user twice.
|
||||||
|
User uOne = up.create(ugiOne);
|
||||||
|
User uTwo = up.create(ugiTwo);
|
||||||
|
|
||||||
|
// Make sure that we didn't break groups and everything worked well.
|
||||||
|
assertArrayEquals(uOne.getGroupNames(),uTwo.getGroupNames());
|
||||||
|
String[] groupNames = ugiOne.getGroupNames();
|
||||||
|
assertTrue(groupNames.length > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBasicAttributes() throws Exception {
|
public void testBasicAttributes() throws Exception {
|
||||||
Configuration conf = HBaseConfiguration.create();
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
|
Loading…
Reference in New Issue