HBASE-13296 Fix the deletion of acl notify nodes for namespace.

This commit is contained in:
Srikanth Srungarapu 2015-04-01 11:15:40 -07:00
parent 64449e2ae7
commit 485800830a
5 changed files with 100 additions and 38 deletions

View File

@ -1302,7 +1302,8 @@ public class AccessController extends BaseMasterAndRegionObserver
return null;
}
});
LOG.info(namespace + "entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
this.authManager.getZKPermissionWatcher().deleteNamespaceACLNode(namespace);
LOG.info(namespace + " entry deleted in " + AccessControlLists.ACL_TABLE_NAME + " table.");
}
@Override

View File

@ -211,4 +211,21 @@ public class ZKPermissionWatcher extends ZooKeeperListener {
watcher.abort("Failed deleting node " + zkNode, e);
}
}
/***
* Delete the acl notify node of namespace
*/
public void deleteNamespaceACLNode(final String namespace) {
String zkNode = ZKUtil.joinZNode(watcher.baseZNode, ACL_NODE);
zkNode = ZKUtil.joinZNode(zkNode, AccessControlLists.NAMESPACE_PREFIX + namespace);
try {
ZKUtil.deleteNode(watcher, zkNode);
} catch (KeeperException.NoNodeException e) {
LOG.warn("No acl notify node of namespace '" + namespace + "'");
} catch (KeeperException e) {
LOG.error("Failed deleting acl node of namespace '" + namespace + "'", e);
watcher.abort("Failed deleting node " + zkNode, e);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.Waiter.Predicate;
@ -655,6 +656,16 @@ public class SecureTestUtil {
deleteTable(testUtil, testUtil.getHBaseAdmin(), tableName);
}
public static void createNamespace(HBaseTestingUtility testUtil, NamespaceDescriptor nsDesc)
throws Exception {
testUtil.getHBaseAdmin().createNamespace(nsDesc);
}
public static void deleteNamespace(HBaseTestingUtility testUtil, String namespace)
throws Exception {
testUtil.getHBaseAdmin().deleteNamespace(namespace);
}
public static void deleteTable(HBaseTestingUtility testUtil, Admin admin, TableName tableName)
throws Exception {
// NOTE: We need a latch because admin is not sync,
@ -671,4 +682,12 @@ public class SecureTestUtil {
observer.tableDeletionLatch.await();
observer.tableDeletionLatch = null;
}
public static String convertToNamespace(String namespace) {
return AccessControlLists.NAMESPACE_PREFIX + namespace;
}
public static String convertToGroup(String group) {
return AccessControlLists.GROUP_PREFIX + group;
}
}

View File

@ -2396,7 +2396,7 @@ public class TestAccessController extends SecureTestUtil {
public void testGetNamespacePermission() throws Exception {
String namespace = "testGetNamespacePermission";
NamespaceDescriptor desc = NamespaceDescriptor.create(namespace).build();
TEST_UTIL.getMiniHBaseCluster().getMaster().createNamespace(desc);
createNamespace(TEST_UTIL, desc);
grantOnNamespace(TEST_UTIL, USER_NONE.getShortName(), namespace, Permission.Action.READ);
try {
List<UserPermission> namespacePermissions = AccessControlClient.getUserPermissions(
@ -2406,7 +2406,7 @@ public class TestAccessController extends SecureTestUtil {
} catch (Throwable thw) {
throw new HBaseException(thw);
}
TEST_UTIL.getMiniHBaseCluster().getMaster().deleteNamespace(namespace);
deleteNamespace(TEST_UTIL, namespace);
}
@Test
@ -2479,7 +2479,7 @@ public class TestAccessController extends SecureTestUtil {
String ns = "testNamespace";
NamespaceDescriptor desc = NamespaceDescriptor.create(ns).build();
final TableName table2 = TableName.valueOf(ns, tableName);
TEST_UTIL.getMiniHBaseCluster().getMaster().createNamespace(desc);
createNamespace(TEST_UTIL, desc);
htd = new HTableDescriptor(table2);
htd.addFamily(new HColumnDescriptor(family));
createTable(TEST_UTIL, htd);
@ -2507,7 +2507,7 @@ public class TestAccessController extends SecureTestUtil {
deleteTable(TEST_UTIL, table1);
deleteTable(TEST_UTIL, table2);
TEST_UTIL.getMiniHBaseCluster().getMaster().deleteNamespace(ns);
deleteNamespace(TEST_UTIL, ns);
}
private void verifyAnyCreate(AccessTestAction action) throws Exception {

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
@ -42,12 +42,15 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.TestTableName;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@ -68,7 +71,10 @@ public class TestAccessController2 extends SecureTestUtil {
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Configuration conf;
private static Connection connection;
/** The systemUserConnection created here is tied to the system user. In case, you are planning
* to create AccessTestAction, DON'T use this systemUserConnection as the 'doAs' user
* gets eclipsed by the system user. */
private static Connection systemUserConnection;
private final static byte[] Q1 = Bytes.toBytes("q1");
private final static byte[] value1 = Bytes.toBytes("value1");
@ -108,47 +114,35 @@ public class TestAccessController2 extends SecureTestUtil {
TESTGROUP2_USER1 =
User.createUserForTesting(conf, "testgroup2_user2", new String[] { TESTGROUP_2 });
connection = ConnectionFactory.createConnection(conf);
systemUserConnection = ConnectionFactory.createConnection(conf);
}
@Before
public void setUp() throws Exception {
TEST_UTIL.getHBaseAdmin().createNamespace(NamespaceDescriptor.create(namespace).build());
try (Table table =
TEST_UTIL.createTable(tableName,
createNamespace(TEST_UTIL, NamespaceDescriptor.create(namespace).build());
try (Table table = TEST_UTIL.createTable(tableName,
new String[] { Bytes.toString(TEST_FAMILY), Bytes.toString(TEST_FAMILY_2) })) {
TEST_UTIL.waitTableEnabled(tableName);
TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
List<Put> puts = new ArrayList<Put>(5);
Put put_1 = new Put(TEST_ROW);
put_1.addColumn(TEST_FAMILY, Q1, value1);
Put put_2 = new Put(TEST_ROW_2);
put_2.addColumn(TEST_FAMILY, Q2, value2);
Put put_3 = new Put(TEST_ROW_3);
put_3.addColumn(TEST_FAMILY_2, Q1, value1);
puts.add(put_1);
puts.add(put_2);
puts.add(put_3);
table.put(puts);
// Ingesting test data.
table.put(Arrays.asList(new Put(TEST_ROW).addColumn(TEST_FAMILY, Q1, value1),
new Put(TEST_ROW_2).addColumn(TEST_FAMILY, Q2, value2),
new Put(TEST_ROW_3).addColumn(TEST_FAMILY_2, Q1, value1)));
}
assertEquals(1, AccessControlLists.getTablePermissions(conf, tableName).size());
try {
assertEquals(1, AccessControlClient.getUserPermissions(connection, tableName.toString())
.size());
assertEquals(1, AccessControlClient.getUserPermissions(systemUserConnection,
tableName.toString()).size());
} catch (Throwable e) {
LOG.error("Error during call of AccessControlClient.getUserPermissions. ", e);
}
// setupOperations();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
connection.close();
systemUserConnection.close();
TEST_UTIL.shutdownMiniCluster();
}
@ -161,7 +155,7 @@ public class TestAccessController2 extends SecureTestUtil {
// Test deleted the table, no problem
LOG.info("Test deleted table " + tableName);
}
TEST_UTIL.getHBaseAdmin().deleteNamespace(namespace);
deleteNamespace(TEST_UTIL, namespace);
// Verify all table/namespace permissions are erased
assertEquals(0, AccessControlLists.getTablePermissions(conf, tableName).size());
assertEquals(0, AccessControlLists.getNamespacePermissions(conf, namespace).size());
@ -397,17 +391,17 @@ public class TestAccessController2 extends SecureTestUtil {
// Verify user from a group which has table level access can read all the data and group which
// has no access can't read any data.
grantOnTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, null, null, Permission.Action.READ);
grantOnTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, null, null, Action.READ);
verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithTableLevelAccess);
verifyDenied(TESTGROUP2_USER1, scanTableActionForGroupWithTableLevelAccess);
// Verify user from a group whose table level access has been revoked can't read any data.
revokeFromTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, null, null);
revokeFromTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, null, null);
verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithTableLevelAccess);
// Verify user from a group which has column family level access can read all the data
// belonging to that family and group which has no access can't read any data.
grantOnTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, TEST_FAMILY, null,
grantOnTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, TEST_FAMILY, null,
Permission.Action.READ);
verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithFamilyLevelAccess);
verifyDenied(TESTGROUP1_USER1, scanFamilyActionForGroupWithFamilyLevelAccess);
@ -416,12 +410,12 @@ public class TestAccessController2 extends SecureTestUtil {
// Verify user from a group whose column family level access has been revoked can't read any
// data from that family.
revokeFromTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, TEST_FAMILY, null);
revokeFromTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, TEST_FAMILY, null);
verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithFamilyLevelAccess);
// Verify user from a group which has column qualifier level access can read data that has this
// family and qualifier, and group which has no access can't read any data.
grantOnTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, TEST_FAMILY, Q1, Permission.Action.READ);
grantOnTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, TEST_FAMILY, Q1, Action.READ);
verifyAllowed(TESTGROUP1_USER1, scanTableActionForGroupWithQualifierLevelAccess);
verifyDenied(TESTGROUP1_USER1, scanFamilyActionForGroupWithQualifierLevelAccess);
verifyDenied(TESTGROUP1_USER1, scanQualifierActionForGroupWithQualifierLevelAccess);
@ -431,7 +425,38 @@ public class TestAccessController2 extends SecureTestUtil {
// Verify user from a group whose column qualifier level access has been revoked can't read the
// data having this column family and qualifier.
revokeFromTable(TEST_UTIL, '@' + TESTGROUP_1, tableName, TEST_FAMILY, Q1);
revokeFromTable(TEST_UTIL, convertToGroup(TESTGROUP_1), tableName, TEST_FAMILY, Q1);
verifyDenied(TESTGROUP1_USER1, scanTableActionForGroupWithQualifierLevelAccess);
}
@Test
public void testACLZNodeDeletion() throws Exception {
String baseAclZNode = "/hbase/acl/";
String ns = "testACLZNodeDeletionNamespace";
NamespaceDescriptor desc = NamespaceDescriptor.create(ns).build();
createNamespace(TEST_UTIL, desc);
final TableName table = TableName.valueOf(ns, "testACLZNodeDeletionTable");
final byte[] family = Bytes.toBytes("f1");
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HColumnDescriptor(family));
createTable(TEST_UTIL, htd);
// Namespace needs this, as they follow the lazy creation of ACL znode.
grantOnNamespace(TEST_UTIL, TESTGROUP1_USER1.getShortName(), ns, Action.ADMIN);
ZooKeeperWatcher zkw = TEST_UTIL.getMiniHBaseCluster().getMaster().getZooKeeper();
assertTrue("The acl znode for table should exist", ZKUtil.checkExists(zkw, baseAclZNode +
table.getNameAsString()) != -1);
assertTrue("The acl znode for namespace should exist", ZKUtil.checkExists(zkw, baseAclZNode +
convertToNamespace(ns)) != -1);
revokeFromNamespace(TEST_UTIL, TESTGROUP1_USER1.getShortName(), ns, Action.ADMIN);
deleteTable(TEST_UTIL, table);
deleteNamespace(TEST_UTIL, ns);
assertTrue("The acl znode for table should have been deleted",
ZKUtil.checkExists(zkw, baseAclZNode + table.getNameAsString()) == -1);
assertTrue( "The acl znode for namespace should have been deleted",
ZKUtil.checkExists(zkw, baseAclZNode + convertToNamespace(ns)) == -1);
}
}