HBASE-5385 Delete table/column should delete stored permissions on -acl- table (Matteo Bertozi)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1337512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-05-12 11:09:59 +00:00
parent 75bf5b04c4
commit 1bbed5ab7b
2 changed files with 59 additions and 6 deletions

View File

@ -41,6 +41,9 @@ import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.Text;
@ -190,11 +193,10 @@ public class AccessControlLists {
/**
* Remove specified table from the _acl_ table.
*/
static void removeTablePermissions(Configuration conf, byte[] tableName)
throws IOException{
static void removeTablePermissions(Configuration conf, byte[] tableName)
throws IOException{
Delete d = new Delete(tableName);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing permissions of removed table "+ Bytes.toString(tableName));
}
@ -205,7 +207,55 @@ public class AccessControlLists {
acls.delete(d);
} finally {
if (acls != null) acls.close();
}
}
}
/**
* Remove specified table column from the _acl_ table.
*/
static void removeTablePermissions(Configuration conf, byte[] tableName, byte[] column)
throws IOException{
if (LOG.isDebugEnabled()) {
LOG.debug("Removing permissions of removed column " + Bytes.toString(column) +
" from table "+ Bytes.toString(tableName));
}
HTable acls = null;
try {
acls = new HTable(conf, ACL_TABLE_NAME);
Scan scan = new Scan();
scan.addFamily(ACL_LIST_FAMILY);
String columnName = Bytes.toString(column);
scan.setFilter(new QualifierFilter(CompareOp.EQUAL, new RegexStringComparator(
String.format("(%s%s%s)|(%s%s)$",
ACL_KEY_DELIMITER, columnName, ACL_KEY_DELIMITER,
ACL_KEY_DELIMITER, columnName))));
Set<byte[]> qualifierSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
ResultScanner scanner = acls.getScanner(scan);
try {
for (Result res : scanner) {
for (byte[] q : res.getFamilyMap(ACL_LIST_FAMILY).navigableKeySet()) {
qualifierSet.add(q);
}
}
} finally {
scanner.close();
}
if (qualifierSet.size() > 0) {
Delete d = new Delete(tableName);
for (byte[] qualifier : qualifierSet) {
d.deleteColumns(ACL_LIST_FAMILY, qualifier);
}
acls.delete(d);
}
} finally {
if (acls != null) acls.close();
}
}
/**

View File

@ -595,7 +595,10 @@ public class AccessController extends BaseRegionObserver
byte[] tableName, byte[] col) throws IOException {}
@Override
public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> c,
byte[] tableName, byte[] col) throws IOException {}
byte[] tableName, byte[] col) throws IOException {
AccessControlLists.removeTablePermissions(c.getEnvironment().getConfiguration(),
tableName, col);
}
@Override
public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> c,
byte[] tableName, byte[] col) throws IOException {}