HDFS-5612. NameNode: change all permission checks to enforce ACLs in addition to permissions. Contributued by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1559281 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-01-17 23:28:45 +00:00
parent 5300294fbe
commit 6a52febfbd
2 changed files with 126 additions and 6 deletions

View File

@ -33,6 +33,9 @@ HDFS-4685 (Unreleased)
HDFS-5758. NameNode: complete implementation of inode modifications for
ACLs. (Chris Nauroth via wheat9)
HDFS-5612. NameNode: change all permission checks to enforce ACLs in
addition to permissions. (Chris Nauroth via wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,16 +20,21 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
/**
* Class that helps in checking file system permission.
@ -42,12 +47,27 @@ class FSPermissionChecker {
static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
/** @return a string for throwing {@link AccessControlException} */
private static String toAccessControlString(INode inode) {
return "\"" + inode.getFullPathName() + "\":"
+ inode.getUserName() + ":" + inode.getGroupName()
+ ":" + (inode.isDirectory()? "d": "-") + inode.getFsPermission();
private String toAccessControlString(INode inode, int snapshotId,
FsAction access, FsPermission mode) {
return toAccessControlString(inode, snapshotId, access, mode, null);
}
/** @return a string for throwing {@link AccessControlException} */
private String toAccessControlString(INode inode, int snapshotId,
FsAction access, FsPermission mode, List<AclEntry> featureEntries) {
StringBuilder sb = new StringBuilder("Permission denied: ")
.append("user=").append(user).append(", ")
.append("access=").append(access).append(", ")
.append("inode=\"").append(inode.getFullPathName()).append("\":")
.append(inode.getUserName(snapshotId)).append(':')
.append(inode.getGroupName(snapshotId)).append(':')
.append(inode.isDirectory() ? 'd' : '-')
.append(mode);
if (featureEntries != null) {
sb.append(':').append(StringUtils.join(",", featureEntries));
}
return sb.toString();
}
private final UserGroupInformation ugi;
private final String user;
@ -219,7 +239,24 @@ private void check(INode inode, int snapshotId, FsAction access
return;
}
FsPermission mode = inode.getFsPermission(snapshotId);
if (mode.getAclBit()) {
// TODO: handling of INodeReference?
AclFeature aclFeature = inode instanceof INodeWithAdditionalFields ?
((INodeWithAdditionalFields)inode).getAclFeature() : null;
if (aclFeature != null) {
List<AclEntry> featureEntries = aclFeature.getEntries();
// It's possible that the inode has a default ACL but no access ACL.
if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
checkAccessAcl(inode, snapshotId, access, mode, featureEntries);
return;
}
}
}
checkFsPermission(inode, snapshotId, access, mode);
}
private void checkFsPermission(INode inode, int snapshotId, FsAction access,
FsPermission mode) throws AccessControlException {
if (user.equals(inode.getUserName(snapshotId))) { //user class
if (mode.getUserAction().implies(access)) { return; }
}
@ -229,8 +266,88 @@ else if (groups.contains(inode.getGroupName(snapshotId))) { //group class
else { //other class
if (mode.getOtherAction().implies(access)) { return; }
}
throw new AccessControlException("Permission denied: user=" + user
+ ", access=" + access + ", inode=" + toAccessControlString(inode));
throw new AccessControlException(
toAccessControlString(inode, snapshotId, access, mode));
}
/**
* Checks requested access against an Access Control List. This method relies
* on finding the ACL data in the relevant portions of {@link FsPermission} and
* {@link AclFeature} as implemented in the logic of {@link AclStorage}. This
* method also relies on receiving the ACL entries in sorted order. This is
* assumed to be true, because the ACL modification methods in
* {@link AclTransformation} sort the resulting entries.
*
* More specifically, this method depends on these invariants in an ACL:
* - The list must be sorted.
* - Each entry in the list must be unique by scope + type + name.
* - There is exactly one each of the unnamed user/group/other entries.
* - The mask entry must not have a name.
* - The other entry must not have a name.
* - Default entries may be present, but they are ignored during enforcement.
*
* @param inode INode accessed inode
* @param snapshotId int snapshot ID
* @param access FsAction requested permission
* @param mode FsPermission mode from inode
* @param featureEntries List<AclEntry> ACL entries from AclFeature of inode
* @throws AccessControlException if the ACL denies permission
*/
private void checkAccessAcl(INode inode, int snapshotId, FsAction access,
FsPermission mode, List<AclEntry> featureEntries)
throws AccessControlException {
boolean foundMatch = false;
// Use owner entry from permission bits if user is owner.
if (user.equals(inode.getUserName(snapshotId))) {
if (mode.getUserAction().implies(access)) {
return;
}
foundMatch = true;
}
// Check named user and group entries if user was not denied by owner entry.
if (!foundMatch) {
for (AclEntry entry: featureEntries) {
if (entry.getScope() == AclEntryScope.DEFAULT) {
break;
}
AclEntryType type = entry.getType();
String name = entry.getName();
if (type == AclEntryType.USER) {
// Use named user entry with mask from permission bits applied if user
// matches name.
if (user.equals(name)) {
FsAction masked = entry.getPermission().and(mode.getGroupAction());
if (masked.implies(access)) {
return;
}
foundMatch = true;
}
} else if (type == AclEntryType.GROUP) {
// Use group entry (unnamed or named) with mask from permission bits
// applied if user is a member and entry grants access. If user is a
// member of multiple groups that have entries that grant access, then
// it doesn't matter which is chosen, so exit early after first match.
String group = name == null ? inode.getGroupName(snapshotId) : name;
if (groups.contains(group)) {
FsAction masked = entry.getPermission().and(mode.getGroupAction());
if (masked.implies(access)) {
return;
}
foundMatch = true;
}
}
}
}
// Use other entry if user was not denied by an earlier match.
if (!foundMatch && mode.getOtherAction().implies(access)) {
return;
}
throw new AccessControlException(
toAccessControlString(inode, snapshotId, access, mode, featureEntries));
}
/** Guarded by {@link FSNamesystem#readLock()} */