HBASE-10173. Need HFile version check in security coprocessors

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1552503 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2013-12-20 06:41:53 +00:00
parent 6bd06665f1
commit edb5f7c0ae
5 changed files with 119 additions and 72 deletions

View File

@ -156,6 +156,11 @@ public class HFile {
*/
public static final int MAX_FORMAT_VERSION = 3;
/**
* Minimum HFile format version with support for persisting cell tags
*/
public static final int MIN_FORMAT_VERSION_WITH_TAGS = 3;
/** Default compression name: none. */
public final static String DEFAULT_COMPRESSION =
DEFAULT_COMPRESSION_ALGORITHM.getName();

View File

@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
@ -149,8 +150,18 @@ public class AccessController extends BaseRegionObserver
private UserProvider userProvider;
// flags if we are able to support cell ACLs
boolean canPersistCellACLs;
void initialize(RegionCoprocessorEnvironment e) throws IOException {
final HRegion region = e.getRegion();
canPersistCellACLs = HFile.getFormatVersion(e.getConfiguration()) >=
HFile.MIN_FORMAT_VERSION_WITH_TAGS;
if (!canPersistCellACLs) {
LOG.info("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
+ " is required to persist cell ACLs. Consider setting " + HFile.FORMAT_VERSION_KEY
+ " accordingly.");
}
Map<byte[], ListMultimap<String,TablePermission>> tables =
AccessControlLists.loadAll(region);
// For each table, write out the table's permissions to the respective
@ -475,81 +486,82 @@ public class AccessController extends BaseRegionObserver
// Table or CF permissions do not allow, enumerate the covered KVs. We
// can stop at the first which does not grant access.
Get get = new Get(row);
if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp);
get.setMaxResultsPerColumnFamily(1); // Hold down memory use on wide rows
if (allVersions) {
get.setMaxVersions();
} else {
get.setMaxVersions(1);
}
for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
byte[] col = entry.getKey();
// TODO: HBASE-7114 could possibly unify the collection type in family
// maps so we would not need to do this
if (entry.getValue() instanceof Set) {
Set<byte[]> set = (Set<byte[]>)entry.getValue();
if (set == null || set.isEmpty()) {
get.addFamily(col);
} else {
for (byte[] qual: set) {
get.addColumn(col, qual);
}
}
} else if (entry.getValue() instanceof List) {
List<Cell> list = (List<Cell>)entry.getValue();
if (list == null || list.isEmpty()) {
get.addFamily(col);
} else {
for (Cell cell: list) {
get.addColumn(col, CellUtil.cloneQualifier(cell));
}
}
int cellsChecked = 0;
if (canPersistCellACLs) {
Get get = new Get(row);
if (timestamp != HConstants.LATEST_TIMESTAMP) get.setTimeStamp(timestamp);
get.setMaxResultsPerColumnFamily(1); // Hold down memory use on wide rows
if (allVersions) {
get.setMaxVersions();
} else {
throw new RuntimeException("Unhandled collection type " +
entry.getValue().getClass().getName());
get.setMaxVersions(1);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning for cells with " + get);
}
RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
List<Cell> cells = Lists.newArrayList();
int numCells = 0;
try {
boolean more = false;
do {
cells.clear();
more = scanner.next(cells);
for (Cell cell: cells) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found cell " + cell);
}
for (Action action: cellCheckActions) {
// Are there permissions for this user for the cell?
if (!authManager.authorize(user, getTableName(e), cell, false, action)) {
AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
user, action, getTableName(e), CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell));
logResult(authResult);
throw new AccessDeniedException("Insufficient permissions " +
authResult.toContextString());
for (Map.Entry<byte[], ? extends Collection<?>> entry: familyMap.entrySet()) {
byte[] col = entry.getKey();
// TODO: HBASE-7114 could possibly unify the collection type in family
// maps so we would not need to do this
if (entry.getValue() instanceof Set) {
Set<byte[]> set = (Set<byte[]>)entry.getValue();
if (set == null || set.isEmpty()) {
get.addFamily(col);
} else {
for (byte[] qual: set) {
get.addColumn(col, qual);
}
}
numCells++;
} else if (entry.getValue() instanceof List) {
List<Cell> list = (List<Cell>)entry.getValue();
if (list == null || list.isEmpty()) {
get.addFamily(col);
} else {
for (Cell cell: list) {
get.addColumn(col, CellUtil.cloneQualifier(cell));
}
}
} else {
throw new RuntimeException("Unhandled collection type " +
entry.getValue().getClass().getName());
}
} while (more);
} catch (AccessDeniedException ex) {
throw ex;
} catch (IOException ex) {
LOG.error("Exception while getting cells to calculate covering permission", ex);
} finally {
scanner.close();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Scanning for cells with " + get);
}
RegionScanner scanner = getRegion(e).getScanner(new Scan(get));
List<Cell> cells = Lists.newArrayList();
try {
boolean more = false;
do {
cells.clear();
more = scanner.next(cells);
for (Cell cell: cells) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found cell " + cell);
}
for (Action action: cellCheckActions) {
// Are there permissions for this user for the cell?
if (!authManager.authorize(user, getTableName(e), cell, false, action)) {
AuthResult authResult = AuthResult.deny(request, "Insufficient permissions",
user, action, getTableName(e), CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell));
logResult(authResult);
throw new AccessDeniedException("Insufficient permissions " +
authResult.toContextString());
}
}
cellsChecked++;
}
} while (more);
} catch (AccessDeniedException ex) {
throw ex;
} catch (IOException ex) {
LOG.error("Exception while getting cells to calculate covering permission", ex);
} finally {
scanner.close();
}
}
// If there were no cells to check, throw the ADE
if (numCells < 1) {
if (cellsChecked < 1) {
if (LOG.isTraceEnabled()) {
LOG.trace("No cells found with scan");
}
@ -562,7 +574,7 @@ public class AccessController extends BaseRegionObserver
// Log that authentication succeeded. We need to trade off logging maybe
// thousands of fine grained decisions with providing detail.
for (byte[] family: get.getFamilyMap().keySet()) {
for (byte[] family: familyMap.keySet()) {
for (Action action: actions) {
logResult(AuthResult.allow(request, "Permission granted", user, action,
getTableName(e), family, null));
@ -1108,7 +1120,11 @@ public class AccessController extends BaseRegionObserver
put.getFamilyCellMap(), put.getTimeStamp(), false, Permission.Action.WRITE);
byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
if (bytes != null) {
addCellPermissions(bytes, put.getFamilyCellMap());
if (canPersistCellACLs) {
addCellPermissions(bytes, put.getFamilyCellMap());
} else {
throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
}
}
}
@ -1158,7 +1174,11 @@ public class AccessController extends BaseRegionObserver
Action.READ, Action.WRITE);
byte[] bytes = put.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
if (bytes != null) {
addCellPermissions(bytes, put.getFamilyCellMap());
if (canPersistCellACLs) {
addCellPermissions(bytes, put.getFamilyCellMap());
} else {
throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
}
}
return result;
}
@ -1204,7 +1224,11 @@ public class AccessController extends BaseRegionObserver
Action.WRITE);
byte[] bytes = append.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
if (bytes != null) {
addCellPermissions(bytes, append.getFamilyCellMap());
if (canPersistCellACLs) {
addCellPermissions(bytes, append.getFamilyCellMap());
} else {
throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
}
}
return null;
}
@ -1220,7 +1244,11 @@ public class AccessController extends BaseRegionObserver
Action.WRITE);
byte[] bytes = increment.getAttribute(AccessControlConstants.OP_ATTRIBUTE_ACL);
if (bytes != null) {
addCellPermissions(bytes, increment.getFamilyCellMap());
if (canPersistCellACLs) {
addCellPermissions(bytes, increment.getFamilyCellMap());
} else {
throw new DoNotRetryIOException("Cell ACLs cannot be persisted");
}
}
return null;
}
@ -1228,7 +1256,13 @@ public class AccessController extends BaseRegionObserver
@Override
public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
// If the HFile version is insufficient to persist tags, we won't have any
// work to do here
if (!canPersistCellACLs) {
return newCell;
}
// Collect any ACLs from the old cell
List<Tag> tags = Lists.newArrayList();
ListMultimap<String,Permission> perms = ArrayListMultimap.create();
if (oldCell != null) {

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.master.MasterServices;
@ -163,6 +164,11 @@ public class VisibilityController extends BaseRegionObserver implements MasterOb
@Override
public void start(CoprocessorEnvironment env) throws IOException {
this.conf = env.getConfiguration();
if (HFile.getFormatVersion(conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
throw new RuntimeException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
+ " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
+ " accordingly.");
}
ZooKeeperWatcher zk = null;
if (env instanceof MasterCoprocessorEnvironment) {
// if running on HMaster

View File

@ -127,6 +127,7 @@ public class TestScannersWithLabels {
conf = TEST_UTIL.getConfiguration();
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS,
SimpleScanLabelGenerator.class, ScanLabelGenerator.class);
conf.setInt("hfile.format.version", 3);
conf.set("hbase.superuser", SUPERUSER.getShortName());
conf.set("hbase.coprocessor.master.classes", VisibilityController.class.getName());
conf.set("hbase.coprocessor.region.classes", VisibilityController.class.getName());

View File

@ -130,6 +130,7 @@ public static void beforeClass() throws Exception {
VisibilityController.class.getName());
conf.set("hbase.coprocessor.region.classes",
VisibilityController.class.getName());
conf.setInt("hfile.format.version", 3);
UTIL.startMiniCluster(1);
// Wait for the labels table to become available
UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000);