diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3b1a1e36bce..647083ebaaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -309,6 +309,8 @@ Release 2.0.4-beta - UNRELEASED NEW FEATURES IMPROVEMENTS + HDFS-4222. NN is unresponsive and loses heartbeats from DNs when + configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh) OPTIMIZATIONS diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b626612ec2d..37406fac37b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -300,6 +300,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private final boolean isPermissionEnabled; private final boolean persistBlocks; private final UserGroupInformation fsOwner; + private final String fsOwnerShortUserName; private final String supergroup; private final boolean standbyShouldCheckpoint; @@ -529,6 +530,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.fsOwner = UserGroupInformation.getCurrentUser(); + this.fsOwnerShortUserName = fsOwner.getShortUserName(); this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY, @@ -1111,9 +1113,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * Dump all metadata into specified file */ void metaSave(String filename) throws IOException { + checkSuperuserPrivilege(); writeLock(); try { - checkSuperuserPrivilege(); File file = new File(System.getProperty("hadoop.log.dir"), filename); PrintWriter out = new PrintWriter(new BufferedWriter( new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8))); @@ -1190,6 +1192,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -1197,7 +1200,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (isInSafeMode()) { throw new SafeModeException("Cannot set permission for " + src, safeMode); } - checkOwner(src); + checkOwner(pc, src); dir.setPermission(src, permission); if (isAuditEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(src, false); @@ -1236,6 +1239,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, FileNotFoundException, SafeModeException, UnresolvedLinkException, IOException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -1243,14 +1247,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (isInSafeMode()) { throw new SafeModeException("Cannot set owner for " + src, safeMode); } - FSPermissionChecker pc = checkOwner(src); - if (!pc.isSuper) { - if (username != null && !pc.user.equals(username)) { - throw new AccessControlException("Non-super user cannot change owner."); + checkOwner(pc, src); + if (!pc.isSuperUser()) { + if (username != null && !pc.getUser().equals(username)) { + throw new AccessControlException("Non-super user cannot change owner"); } if (group != null && !pc.containsGroup(group)) { - throw new AccessControlException("User does not belong to " + group - + " ."); + throw new AccessControlException("User does not belong to " + group); } } dir.setOwner(src, username, group); @@ -1300,8 +1303,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, LocatedBlocks getBlockLocations(String src, long offset, long length, boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode) throws FileNotFoundException, UnresolvedLinkException, IOException { + FSPermissionChecker pc = getPermissionChecker(); try { - return getBlockLocationsInt(src, offset, length, doAccessTime, + return getBlockLocationsInt(pc, src, offset, length, doAccessTime, needBlockToken, checkSafeMode); } catch (AccessControlException e) { if (isAuditEnabled() && isExternalInvocation()) { @@ -1313,11 +1317,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - private LocatedBlocks getBlockLocationsInt(String src, long offset, long length, - boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode) + private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc, + String src, long offset, long length, boolean doAccessTime, + boolean needBlockToken, boolean checkSafeMode) throws FileNotFoundException, UnresolvedLinkException, IOException { if (isPermissionEnabled) { - checkPathAccess(src, FsAction.READ); + checkPathAccess(pc, src, FsAction.READ); } if (offset < 0) { @@ -1447,13 +1452,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot concat " + target, safeMode); } - concatInternal(target, srcs); + concatInternal(pc, target, srcs); if (isAuditEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(target, false); } @@ -1469,18 +1475,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } /** See {@link #concat(String, String[])} */ - private void concatInternal(String target, String [] srcs) + private void concatInternal(FSPermissionChecker pc, String target, String [] srcs) throws IOException, UnresolvedLinkException { assert hasWriteLock(); // write permission for the target if (isPermissionEnabled) { - checkPathAccess(target, FsAction.WRITE); + checkPathAccess(pc, target, FsAction.WRITE); // and srcs for(String aSrc: srcs) { - checkPathAccess(aSrc, FsAction.READ); // read the file - checkParentAccess(aSrc, FsAction.WRITE); // for delete + checkPathAccess(pc, aSrc, FsAction.READ); // read the file + checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete } } @@ -1597,13 +1603,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new IOException("Access time for hdfs is not configured. " + " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter."); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); // Write access is required to set access and modification times if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } INode inode = dir.getINode(src); if (inode != null) { @@ -1644,6 +1651,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { HdfsFileStatus resultingStat = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -1651,7 +1659,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if (!createParent) { verifyParentDir(link); } - createSymlinkInternal(target, link, dirPerms, createParent); + createSymlinkInternal(pc, target, link, dirPerms, createParent); if (isAuditEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(link, false); } @@ -1669,8 +1677,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** * Create a symbolic link. */ - private void createSymlinkInternal(String target, String link, - PermissionStatus dirPerms, boolean createParent) + private void createSymlinkInternal(FSPermissionChecker pc, String target, + String link, PermissionStatus dirPerms, boolean createParent) throws IOException, UnresolvedLinkException { assert hasWriteLock(); if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -1688,7 +1696,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, +" either because the filename is invalid or the file exists"); } if (isPermissionEnabled) { - checkAncestorAccess(link, FsAction.WRITE); + checkAncestorAccess(pc, link, FsAction.WRITE); } // validate that we have enough inodes. checkFsObjectLimit(); @@ -1727,17 +1735,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private boolean setReplicationInt(final String src, final short replication) throws IOException { blockManager.verifyReplication(src, replication, null); - final boolean isFile; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - if (isInSafeMode()) { throw new SafeModeException("Cannot set replication for " + src, safeMode); } if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } final short[] oldReplication = new short[1]; @@ -1761,11 +1768,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, long getPreferredBlockSize(String filename) throws IOException, UnresolvedLinkException { + FSPermissionChecker pc = getPermissionChecker(); readLock(); try { checkOperation(OperationCategory.READ); if (isPermissionEnabled) { - checkTraverse(filename); + checkTraverse(pc, filename); } return dir.getPreferredBlockSize(filename); } finally { @@ -1826,11 +1834,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FileNotFoundException, ParentNotDirectoryException, IOException { boolean skipSync = false; final HdfsFileStatus stat; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - - startFileInternal(src, permissions, holder, clientMachine, flag, + startFileInternal(pc, src, permissions, holder, clientMachine, flag, createParent, replication, blockSize); stat = dir.getFileInfo(src, false); } catch (StandbyException se) { @@ -1869,7 +1877,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * * @return the last block locations if the block is partial or null otherwise */ - private LocatedBlock startFileInternal(String src, + private LocatedBlock startFileInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet flag, boolean createParent, short replication, long blockSize) throws SafeModeException, FileAlreadyExistsException, @@ -1902,9 +1910,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean append = flag.contains(CreateFlag.APPEND); if (isPermissionEnabled) { if (append || (overwrite && pathExists)) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } else { - checkAncestorAccess(src, FsAction.WRITE); + checkAncestorAccess(pc, src, FsAction.WRITE); } } @@ -2027,6 +2035,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean recoverLease(String src, String holder, String clientMachine) throws IOException { boolean skipSync = false; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2044,7 +2053,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return true; } if (isPermissionEnabled) { - checkPathAccess(src, FsAction.WRITE); + checkPathAccess(pc, src, FsAction.WRITE); } recoverLeaseInternal(inode, src, holder, clientMachine, true); @@ -2167,11 +2176,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, DFS_SUPPORT_APPEND_KEY + " configuration option to enable it."); } LocatedBlock lb = null; + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - lb = startFileInternal(src, null, holder, clientMachine, + lb = startFileInternal(pc, src, null, holder, clientMachine, EnumSet.of(CreateFlag.APPEND), false, blockManager.maxReplication, 0); } catch (StandbyException se) { @@ -2708,11 +2718,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - status = renameToInternal(src, dst); + status = renameToInternal(pc, src, dst); if (status && isAuditEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); } @@ -2730,7 +2741,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** @deprecated See {@link #renameTo(String, String)} */ @Deprecated - private boolean renameToInternal(String src, String dst) + private boolean renameToInternal(FSPermissionChecker pc, String src, String dst) throws IOException, UnresolvedLinkException { assert hasWriteLock(); if (isInSafeMode()) { @@ -2746,8 +2757,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // of rewriting the dst String actualdst = dir.isDir(dst)? dst + Path.SEPARATOR + new Path(src).getName(): dst; - checkParentAccess(src, FsAction.WRITE); - checkAncestorAccess(actualdst, FsAction.WRITE); + checkParentAccess(pc, src, FsAction.WRITE); + checkAncestorAccess(pc, actualdst, FsAction.WRITE); } if (dir.renameTo(src, dst)) { @@ -2765,11 +2776,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - " + src + " to " + dst); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - - renameToInternal(src, dst, options); + renameToInternal(pc, src, dst, options); if (isAuditEnabled() && isExternalInvocation()) { resultingStat = dir.getFileInfo(dst, false); } @@ -2787,7 +2798,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } } - private void renameToInternal(String src, String dst, + private void renameToInternal(FSPermissionChecker pc, String src, String dst, Options.Rename... options) throws IOException { assert hasWriteLock(); if (isInSafeMode()) { @@ -2797,8 +2808,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new InvalidPathException("Invalid name: " + dst); } if (isPermissionEnabled) { - checkParentAccess(src, FsAction.WRITE); - checkAncestorAccess(dst, FsAction.WRITE); + checkParentAccess(pc, src, FsAction.WRITE); + checkAncestorAccess(pc, dst, FsAction.WRITE); } dir.renameTo(src, dst, options); @@ -2840,6 +2851,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return status; } + private FSPermissionChecker getPermissionChecker() + throws AccessControlException { + return new FSPermissionChecker(fsOwnerShortUserName, supergroup); + } /** * Remove a file/directory from the namespace. *

@@ -2856,7 +2871,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, SafeModeException, UnresolvedLinkException, IOException { BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); - + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2867,7 +2882,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new IOException(src + " is non empty"); } if (enforcePermission && isPermissionEnabled) { - checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL); } // Unlink the target directory from directory tree if (!dir.delete(src, collectedBlocks)) { @@ -2984,9 +2999,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throws AccessControlException, UnresolvedLinkException, StandbyException, IOException { HdfsFileStatus stat = null; - + FSPermissionChecker pc = getPermissionChecker(); readLock(); - try { checkOperation(OperationCategory.READ); @@ -2994,7 +3008,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new InvalidPathException("Invalid file name: " + src); } if (isPermissionEnabled) { - checkTraverse(src); + checkTraverse(pc, src); } stat = dir.getFileInfo(src, resolveLink); } catch (AccessControlException e) { @@ -3038,11 +3052,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats, if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src); } + FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { checkOperation(OperationCategory.WRITE); - - status = mkdirsInternal(src, permissions, createParent); + status = mkdirsInternal(pc, src, permissions, createParent); } finally { writeUnlock(); } @@ -3059,7 +3073,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, /** * Create all the necessary directories */ - private boolean mkdirsInternal(String src, + private boolean mkdirsInternal(FSPermissionChecker pc, String src, PermissionStatus permissions, boolean createParent) throws IOException, UnresolvedLinkException { assert hasWriteLock(); @@ -3067,7 +3081,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new SafeModeException("Cannot create directory " + src, safeMode); } if (isPermissionEnabled) { - checkTraverse(src); + checkTraverse(pc, src); } if (dir.isDir(src)) { // all the users of mkdirs() are used to expect 'true' even if @@ -3078,7 +3092,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, throw new InvalidPathException(src); } if (isPermissionEnabled) { - checkAncestorAccess(src, FsAction.WRITE); + checkAncestorAccess(pc, src, FsAction.WRITE); } if (!createParent) { verifyParentDir(src); @@ -3097,12 +3111,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, ContentSummary getContentSummary(String src) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, StandbyException { + FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName, + supergroup); readLock(); try { checkOperation(OperationCategory.READ); - if (isPermissionEnabled) { - checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE); + checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE); } return dir.getContentSummary(src); } finally { @@ -3117,15 +3132,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ void setQuota(String path, long nsQuota, long dsQuota) throws IOException, UnresolvedLinkException { + checkSuperuserPrivilege(); writeLock(); try { checkOperation(OperationCategory.WRITE); if (isInSafeMode()) { throw new SafeModeException("Cannot set quota on " + path, safeMode); } - if (isPermissionEnabled) { - checkSuperuserPrivilege(); - } dir.setQuota(path, nsQuota, dsQuota); } finally { writeUnlock(); @@ -3494,15 +3507,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean needLocation) throws AccessControlException, UnresolvedLinkException, IOException { DirectoryListing dl; + FSPermissionChecker pc = getPermissionChecker(); readLock(); try { checkOperation(OperationCategory.READ); if (isPermissionEnabled) { if (dir.isDir(src)) { - checkPathAccess(src, FsAction.READ_EXECUTE); + checkPathAccess(pc, src, FsAction.READ_EXECUTE); } else { - checkTraverse(src); + checkTraverse(pc, src); } } if (isAuditEnabled() && isExternalInvocation()) { @@ -3803,9 +3817,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws IOException if */ void saveNamespace() throws AccessControlException, IOException { + checkSuperuserPrivilege(); readLock(); try { - checkSuperuserPrivilege(); if (!isInSafeMode()) { throw new IOException("Safe mode should be turned ON " + "in order to create namespace image."); @@ -3824,9 +3838,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, * @throws AccessControlException if superuser privilege is violated. */ boolean restoreFailedStorage(String arg) throws AccessControlException { + checkSuperuserPrivilege(); writeLock(); try { - checkSuperuserPrivilege(); // if it is disabled - enable it and vice versa. if(arg.equals("check")) @@ -3846,10 +3860,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } void finalizeUpgrade() throws IOException { + checkSuperuserPrivilege(); writeLock(); try { checkOperation(OperationCategory.WRITE); - checkSuperuserPrivilege(); getFSImage().finalizeUpgrade(); } finally { writeUnlock(); @@ -4585,10 +4599,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } CheckpointSignature rollEditLog() throws IOException { + checkSuperuserPrivilege(); writeLock(); try { checkOperation(OperationCategory.JOURNAL); - checkSuperuserPrivilege(); if (isInSafeMode()) { throw new SafeModeException("Log not rolled", safeMode); } @@ -4639,61 +4653,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats, return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission); } - private FSPermissionChecker checkOwner(String path - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, true, null, null, null, null); + private void checkOwner(FSPermissionChecker pc, String path) + throws AccessControlException, UnresolvedLinkException { + checkPermission(pc, path, true, null, null, null, null); } - private FSPermissionChecker checkPathAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, null, access, null); + private void checkPathAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, null, null, access, null); } - private FSPermissionChecker checkParentAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, access, null, null); + private void checkParentAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, null, access, null, null); } - private FSPermissionChecker checkAncestorAccess(String path, FsAction access - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, access, null, null, null); + private void checkAncestorAccess(FSPermissionChecker pc, + String path, FsAction access) throws AccessControlException, + UnresolvedLinkException { + checkPermission(pc, path, false, access, null, null, null); } - private FSPermissionChecker checkTraverse(String path - ) throws AccessControlException, UnresolvedLinkException { - return checkPermission(path, false, null, null, null, null); + private void checkTraverse(FSPermissionChecker pc, String path) + throws AccessControlException, UnresolvedLinkException { + checkPermission(pc, path, false, null, null, null, null); } @Override - public void checkSuperuserPrivilege() throws AccessControlException { + public void checkSuperuserPrivilege() + throws AccessControlException { if (isPermissionEnabled) { - FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup); + FSPermissionChecker pc = getPermissionChecker(); + pc.checkSuperuserPrivilege(); } } /** - * Check whether current user have permissions to access the path. - * For more details of the parameters, see - * {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}. + * Check whether current user have permissions to access the path. For more + * details of the parameters, see + * {@link FSPermissionChecker#checkPermission()}. */ - private FSPermissionChecker checkPermission(String path, boolean doCheckOwner, - FsAction ancestorAccess, FsAction parentAccess, FsAction access, - FsAction subAccess) throws AccessControlException, UnresolvedLinkException { - FSPermissionChecker pc = new FSPermissionChecker( - fsOwner.getShortUserName(), supergroup); - if (!pc.isSuper) { + private void checkPermission(FSPermissionChecker pc, + String path, boolean doCheckOwner, FsAction ancestorAccess, + FsAction parentAccess, FsAction access, FsAction subAccess) + throws AccessControlException, UnresolvedLinkException { + if (!pc.isSuperUser()) { dir.waitForReady(); readLock(); try { - pc.checkPermission(path, dir.rootDir, doCheckOwner, - ancestorAccess, parentAccess, access, subAccess); + pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess, + parentAccess, access, subAccess); } finally { readUnlock(); - } + } } - return pc; } - + /** * Check to see if we have exceeded the limit on the number * of inodes. @@ -5137,16 +5154,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ Collection listCorruptFileBlocks(String path, String[] cookieTab) throws IOException { - + checkSuperuserPrivilege(); readLock(); try { checkOperation(OperationCategory.READ); - if (!isPopulatingReplQueues()) { throw new IOException("Cannot run listCorruptFileBlocks because " + "replication queues have not been initialized."); } - checkSuperuserPrivilege(); // print a limited # of corrupt files per call int count = 0; ArrayList corruptFiles = new ArrayList(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java index 91ebc968a04..d88bfd87960 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.Stack; @@ -31,14 +32,20 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -/** Perform permission checking in {@link FSNamesystem}. */ +/** + * Class that helps in checking file system permission. + * The state of this class need not be synchronized as it has data structures that + * are read-only. + * + * Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}. + */ class FSPermissionChecker { static final Log LOG = LogFactory.getLog(UserGroupInformation.class); - private final UserGroupInformation ugi; - public final String user; - private final Set groups = new HashSet(); - public final boolean isSuper; + private final String user; + /** A set with group namess. Not synchronized since it is unmodifiable */ + private final Set groups; + private final boolean isSuper; FSPermissionChecker(String fsOwner, String supergroup ) throws AccessControlException{ @@ -47,10 +54,9 @@ class FSPermissionChecker { } catch (IOException e) { throw new AccessControlException(e); } - - groups.addAll(Arrays.asList(ugi.getGroupNames())); + HashSet s = new HashSet(Arrays.asList(ugi.getGroupNames())); + groups = Collections.unmodifiableSet(s); user = ugi.getShortUserName(); - isSuper = user.equals(fsOwner) || groups.contains(supergroup); } @@ -60,20 +66,23 @@ class FSPermissionChecker { */ public boolean containsGroup(String group) {return groups.contains(group);} + public String getUser() { + return user; + } + + public boolean isSuperUser() { + return isSuper; + } + /** * Verify if the caller has the required permission. This will result into * an exception if the caller is not allowed to access the resource. - * @param owner owner of the system - * @param supergroup supergroup of the system */ - public static void checkSuperuserPrivilege(UserGroupInformation owner, - String supergroup) - throws AccessControlException { - FSPermissionChecker checker = - new FSPermissionChecker(owner.getShortUserName(), supergroup); - if (!checker.isSuper) { + public void checkSuperuserPrivilege() + throws AccessControlException { + if (!isSuper) { throw new AccessControlException("Access denied for user " - + checker.user + ". Superuser privilege is required"); + + user + ". Superuser privilege is required"); } } @@ -103,9 +112,11 @@ class FSPermissionChecker { * @param subAccess If path is a directory, * it is the access required of the path and all the sub-directories. * If path is not a directory, there is no effect. - * @return a PermissionChecker object which caches data for later use. * @throws AccessControlException * @throws UnresolvedLinkException + * + * Guarded by {@link FSNamesystem#readLock()} + * Caller of this method must hold that lock. */ void checkPermission(String path, INodeDirectory root, boolean doCheckOwner, FsAction ancestorAccess, FsAction parentAccess, FsAction access, @@ -148,6 +159,7 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkOwner(INode inode) throws AccessControlException { if (inode != null && user.equals(inode.getUserName())) { return; @@ -155,6 +167,7 @@ class FSPermissionChecker { throw new AccessControlException("Permission denied"); } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkTraverse(INode[] inodes, int last ) throws AccessControlException { for(int j = 0; j <= last; j++) { @@ -162,6 +175,7 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void checkSubAccess(INode inode, FsAction access ) throws AccessControlException { if (inode == null || !inode.isDirectory()) { @@ -181,11 +195,13 @@ class FSPermissionChecker { } } + /** Guarded by {@link FSNamesystem#readLock()} */ private void check(INode[] inodes, int i, FsAction access ) throws AccessControlException { check(i >= 0? inodes[i]: null, access); } + /** Guarded by {@link FSNamesystem#readLock()} */ private void check(INode inode, FsAction access ) throws AccessControlException { if (inode == null) { @@ -206,7 +222,9 @@ class FSPermissionChecker { + ", access=" + access + ", inode=" + inode); } - private void checkStickyBit(INode parent, INode inode) throws AccessControlException { + /** Guarded by {@link FSNamesystem#readLock()} */ + private void checkStickyBit(INode parent, INode inode) + throws AccessControlException { if(!parent.getFsPermission().getStickyBit()) { return; }