HDFS-7420. Delegate permission checks to FSDirectory. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-21 11:01:14 -08:00
parent 3114d4731d
commit 23dacb3892
4 changed files with 95 additions and 30 deletions

View File

@ -383,6 +383,8 @@ Release 2.7.0 - UNRELEASED
HDFS-7415. Move FSNameSystem.resolvePath() to FSDirectory. (wheat9)
HDFS-7420. Delegate permission checks to FSDirectory. (wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -52,6 +52,7 @@
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -96,6 +97,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
@ -152,6 +154,8 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final ReentrantReadWriteLock dirLock;
private final boolean isPermissionEnabled;
private final String fsOwnerShortUserName;
private final String supergroup;
// utility methods to acquire and release read lock and write lock
void readLock() {
@ -195,13 +199,19 @@ public int getWriteHoldCount() {
*/
private final NameCache<ByteArray> nameCache;
FSDirectory(FSNamesystem ns, Configuration conf) {
FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
this.dirLock = new ReentrantReadWriteLock(true); // fair
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
this.isPermissionEnabled = conf.getBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT);
this.fsOwnerShortUserName =
UserGroupInformation.getCurrentUser().getShortUserName();
this.supergroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
@ -3331,4 +3341,76 @@ INodesInPath getINodesInPath4Write(String src, boolean resolveLink)
}
return inodesInPath;
}
FSPermissionChecker getPermissionChecker()
throws AccessControlException {
try {
return new FSPermissionChecker(fsOwnerShortUserName, supergroup,
NameNode.getRemoteUser());
} catch (IOException ioe) {
throw new AccessControlException(ioe);
}
}
void checkOwner(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, true, null, null, null, null);
}
void checkPathAccess(FSPermissionChecker pc, String path,
FsAction access)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, null, null, access, null);
}
void checkParentAccess(
FSPermissionChecker pc, String path, FsAction access)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, null, access, null, null);
}
void checkAncestorAccess(
FSPermissionChecker pc, String path, FsAction access)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, access, null, null, null);
}
void checkTraverse(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, null, null, null, null);
}
/**
* Check whether current user have permissions to access the path. For more
* details of the parameters, see
* {@link FSPermissionChecker#checkPermission}.
*/
private void checkPermission(
FSPermissionChecker pc, String path, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
FsAction subAccess)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess, false, true);
}
/**
* Check whether current user have permissions to access the path. For more
* details of the parameters, see
* {@link FSPermissionChecker#checkPermission}.
*/
void checkPermission(
FSPermissionChecker pc, String path, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
FsAction subAccess, boolean ignoreEmptyDir, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
if (!pc.isSuperUser()) {
readLock();
try {
pc.checkPermission(path, this, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess, ignoreEmptyDir, resolveLink);
} finally {
readUnlock();
}
}
}
}

View File

@ -404,7 +404,6 @@ private void logAuditEvent(boolean succeeded,
static int BLOCK_DELETION_INCREMENT = 1000;
private final boolean isPermissionEnabled;
private final UserGroupInformation fsOwner;
private final String fsOwnerShortUserName;
private final String supergroup;
private final boolean standbyShouldCheckpoint;
@ -777,7 +776,6 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFS_STORAGE_POLICY_ENABLED_DEFAULT);
this.fsOwner = UserGroupInformation.getCurrentUser();
this.fsOwnerShortUserName = fsOwner.getShortUserName();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
@ -3922,11 +3920,7 @@ private boolean deleteInt(String src, boolean recursive, boolean logRetryCache)
private FSPermissionChecker getPermissionChecker()
throws AccessControlException {
try {
return new FSPermissionChecker(fsOwnerShortUserName, supergroup, getRemoteUser());
} catch (IOException ioe) {
throw new AccessControlException(ioe);
}
return dir.getPermissionChecker();
}
/**
@ -6411,13 +6405,13 @@ PermissionStatus createFsOwnerPermissions(FsPermission permission) {
private void checkOwner(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, true, null, null, null, null);
dir.checkOwner(pc, path);
}
private void checkPathAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, null, null, access, null);
dir.checkPathAccess(pc, path, access);
}
private void checkUnreadableBySuperuser(FSPermissionChecker pc,
@ -6438,18 +6432,18 @@ private void checkUnreadableBySuperuser(FSPermissionChecker pc,
private void checkParentAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, null, access, null, null);
dir.checkParentAccess(pc, path, access);
}
private void checkAncestorAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, access, null, null, null);
dir.checkAncestorAccess(pc, path, access);
}
private void checkTraverse(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, null, null, null, null);
dir.checkTraverse(pc, path);
}
@Override
@ -6474,26 +6468,13 @@ private void checkPermission(FSPermissionChecker pc,
parentAccess, access, subAccess, false, true);
}
/**
* Check whether current user have permissions to access the path. For more
* details of the parameters, see
* {@link FSPermissionChecker#checkPermission}.
*/
private void checkPermission(FSPermissionChecker pc,
String path, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess,
boolean ignoreEmptyDir, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
if (!pc.isSuperUser()) {
waitForLoadingFSImage();
readLock();
try {
pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess, ignoreEmptyDir, resolveLink);
} finally {
readUnlock();
}
}
dir.checkPermission(pc, path, doCheckOwner, ancestorAccess, parentAccess,
access, subAccess, ignoreEmptyDir, resolveLink);
}
/**

View File

@ -75,7 +75,7 @@ public class TestFSPermissionChecker {
private INodeDirectory inodeRoot;
@Before
public void setUp() {
public void setUp() throws IOException {
Configuration conf = new Configuration();
FSNamesystem fsn = mock(FSNamesystem.class);
doAnswer(new Answer() {