HBASE-10226. [AccessController] Namespace grants not always checked

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1553327 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2013-12-24 20:19:14 +00:00
parent b087b697c2
commit 10fbfddf0e
3 changed files with 89 additions and 44 deletions

View File

@ -374,7 +374,7 @@ public class TableAuthManager {
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
// User is authorized at table or CF level
if (authorizeUser(user.getShortName(), table, family, qualifier, action)) {
if (authorizeUser(user, table, family, qualifier, action)) {
return true;
}
String groupNames[] = user.getGroupNames();
@ -409,16 +409,17 @@ public class TableAuthManager {
}
public boolean authorize(User user, String namespace, Permission.Action action) {
if (authorizeUser(user.getShortName(), action)) {
// Global authorizations supercede namespace level
if (authorizeUser(user, action)) {
return true;
}
// Check namespace permissions
PermissionCache<TablePermission> tablePerms = nsCache.get(namespace);
if (tablePerms != null) {
List<TablePermission> userPerms = tablePerms.getUser(user.getShortName());
if (authorize(userPerms, namespace, action)) {
return true;
}
String[] groupNames = user.getGroupNames();
if (groupNames != null) {
for (String group : groupNames) {
@ -451,39 +452,39 @@ public class TableAuthManager {
* Checks global authorization for a specific action for a user, based on the
* stored user permissions.
*/
public boolean authorizeUser(String username, Permission.Action action) {
return authorize(globalCache.getUser(username), action);
public boolean authorizeUser(User user, Permission.Action action) {
return authorize(globalCache.getUser(user.getShortName()), action);
}
/**
* Checks authorization to a given table and column family for a user, based on the
* stored user permissions.
*
* @param username
* @param user
* @param table
* @param family
* @param action
* @return true if known and authorized, false otherwise
*/
public boolean authorizeUser(String username, TableName table, byte[] family,
public boolean authorizeUser(User user, TableName table, byte[] family,
Permission.Action action) {
return authorizeUser(username, table, family, null, action);
return authorizeUser(user, table, family, null, action);
}
public boolean authorizeUser(String username, TableName table, byte[] family,
public boolean authorizeUser(User user, TableName table, byte[] family,
byte[] qualifier, Permission.Action action) {
// global authorization supercedes table level
if (authorizeUser(username, action)) {
if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
// Global and namespace authorizations supercede table level
if (authorize(user, table.getNamespaceAsString(), action)) {
return true;
}
if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
return authorize(getTablePermissions(table).getUser(username), table, family,
// Check table permissions
return authorize(getTablePermissions(table).getUser(user.getShortName()), table, family,
qualifier, action);
}
/**
* Checks authorization for a given action for a group, based on the stored
* Checks global authorization for a given action for a group, based on the stored
* permissions.
*/
public boolean authorizeGroup(String groupName, Permission.Action action) {
@ -501,17 +502,23 @@ public class TableAuthManager {
*/
public boolean authorizeGroup(String groupName, TableName table, byte[] family,
Permission.Action action) {
// global authorization supercedes table level
// Global authorization supercedes table level
if (authorizeGroup(groupName, action)) {
return true;
}
if (table == null) table = AccessControlLists.ACL_TABLE_NAME;
// Namespace authorization supercedes table level
if (authorize(getNamespacePermissions(table.getNamespaceAsString()).getGroup(groupName),
table, family, action)) {
return true;
}
// Check table level
return authorize(getTablePermissions(table).getGroup(groupName), table, family, action);
}
public boolean authorize(User user, TableName table, byte[] family,
byte[] qualifier, Permission.Action action) {
if (authorizeUser(user.getShortName(), table, family, qualifier, action)) {
if (authorizeUser(user, table, family, qualifier, action)) {
return true;
}

View File

@ -27,7 +27,6 @@ import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -58,7 +57,6 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@ -2460,4 +2458,39 @@ public class TestAccessController extends SecureTestUtil {
verifyAllowed(deleteTableAction, TABLE_ADMIN);
}
@Test
public void testNamespaceUserGrant() throws Exception {
AccessTestAction getAction = new AccessTestAction() {
@Override
public Object run() throws Exception {
HTable t = new HTable(conf, TEST_TABLE.getTableName());
try {
return t.get(new Get(TEST_ROW));
} finally {
t.close();
}
}
};
verifyDenied(getAction, USER_NONE);
// Grant namespace READ to USER_NONE, this should supercede any table permissions
HTable acl = new HTable(conf, AccessControlLists.ACL_TABLE_NAME);
try {
BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_BYTE_ARRAY);
AccessControlService.BlockingInterface protocol =
AccessControlService.newBlockingStub(service);
AccessControlProtos.GrantRequest request = RequestConverter.
buildGrantRequest(USER_NONE.getShortName(),
TEST_TABLE.getTableName().getNamespaceAsString(),
AccessControlProtos.Permission.Action.READ);
protocol.grant(null, request);
} finally {
acl.close();
}
// Now USER_NONE should be able to read also
verifyAllowed(getAction, USER_NONE);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -87,22 +88,26 @@ public class TestZKPermissionsWatcher {
@Test
public void testPermissionsWatcher() throws Exception {
assertFalse(AUTH_A.authorizeUser("george", TEST_TABLE, null,
Configuration conf = UTIL.getConfiguration();
User george = User.createUserForTesting(conf, "george", new String[] { });
User hubert = User.createUserForTesting(conf, "hubert", new String[] { });
assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_A.authorizeUser("george", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertFalse(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertFalse(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertFalse(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
// update ACL: george RW
@ -110,7 +115,7 @@ public class TestZKPermissionsWatcher {
acl.add(new TablePermission(TEST_TABLE, null, TablePermission.Action.READ,
TablePermission.Action.WRITE));
final long mtimeB = AUTH_B.getMTime();
AUTH_A.setTableUserPermissions("george", TEST_TABLE, acl);
AUTH_A.setTableUserPermissions(george.getShortName(), TEST_TABLE, acl);
// Wait for the update to propagate
UTIL.waitFor(10000, 100, new Predicate<Exception>() {
@Override
@ -121,21 +126,21 @@ public class TestZKPermissionsWatcher {
Thread.sleep(100);
// check it
assertTrue(AUTH_A.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertTrue(AUTH_A.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertTrue(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertTrue(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertFalse(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertFalse(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
// update ACL: hubert R
@ -153,21 +158,21 @@ public class TestZKPermissionsWatcher {
Thread.sleep(100);
// check it
assertTrue(AUTH_A.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertTrue(AUTH_A.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_A.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertTrue(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.READ));
assertTrue(AUTH_B.authorizeUser("george", TEST_TABLE, null,
assertTrue(AUTH_B.authorizeUser(george, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertTrue(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertTrue(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_A.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_A.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
assertTrue(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertTrue(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.READ));
assertFalse(AUTH_B.authorizeUser("hubert", TEST_TABLE, null,
assertFalse(AUTH_B.authorizeUser(hubert, TEST_TABLE, null,
TablePermission.Action.WRITE));
}
}