HDFS-4222. NN is unresponsive and loses heartbeats from DNs when configured to use LDAP and LDAP has issues. Contributed by Xiaobo Peng and Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1448801 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2013-02-21 21:02:39 +00:00
parent c5296502ba
commit cdb292f44c
3 changed files with 151 additions and 116 deletions

View File

@ -309,6 +309,8 @@ Release 2.0.4-beta - UNRELEASED
NEW FEATURES
IMPROVEMENTS
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
configured to use LDAP and LDAP has issues. (Xiaobo Peng, suresh)
OPTIMIZATIONS

View File

@ -300,6 +300,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final boolean isPermissionEnabled;
private final boolean persistBlocks;
private final UserGroupInformation fsOwner;
private final String fsOwnerShortUserName;
private final String supergroup;
private final boolean standbyShouldCheckpoint;
@ -529,6 +530,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsOwner = UserGroupInformation.getCurrentUser();
this.fsOwnerShortUserName = fsOwner.getShortUserName();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
@ -1111,9 +1113,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* Dump all metadata into specified file
*/
void metaSave(String filename) throws IOException {
checkSuperuserPrivilege();
writeLock();
try {
checkSuperuserPrivilege();
File file = new File(System.getProperty("hadoop.log.dir"), filename);
PrintWriter out = new PrintWriter(new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
@ -1190,6 +1192,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -1197,7 +1200,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (isInSafeMode()) {
throw new SafeModeException("Cannot set permission for " + src, safeMode);
}
checkOwner(src);
checkOwner(pc, src);
dir.setPermission(src, permission);
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(src, false);
@ -1236,6 +1239,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, FileNotFoundException, SafeModeException,
UnresolvedLinkException, IOException {
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -1243,14 +1247,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode);
}
FSPermissionChecker pc = checkOwner(src);
if (!pc.isSuper) {
if (username != null && !pc.user.equals(username)) {
throw new AccessControlException("Non-super user cannot change owner.");
checkOwner(pc, src);
if (!pc.isSuperUser()) {
if (username != null && !pc.getUser().equals(username)) {
throw new AccessControlException("Non-super user cannot change owner");
}
if (group != null && !pc.containsGroup(group)) {
throw new AccessControlException("User does not belong to " + group
+ " .");
throw new AccessControlException("User does not belong to " + group);
}
}
dir.setOwner(src, username, group);
@ -1300,8 +1303,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
throws FileNotFoundException, UnresolvedLinkException, IOException {
FSPermissionChecker pc = getPermissionChecker();
try {
return getBlockLocationsInt(src, offset, length, doAccessTime,
return getBlockLocationsInt(pc, src, offset, length, doAccessTime,
needBlockToken, checkSafeMode);
} catch (AccessControlException e) {
if (isAuditEnabled() && isExternalInvocation()) {
@ -1313,11 +1317,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private LocatedBlocks getBlockLocationsInt(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken, boolean checkSafeMode)
private LocatedBlocks getBlockLocationsInt(FSPermissionChecker pc,
String src, long offset, long length, boolean doAccessTime,
boolean needBlockToken, boolean checkSafeMode)
throws FileNotFoundException, UnresolvedLinkException, IOException {
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.READ);
checkPathAccess(pc, src, FsAction.READ);
}
if (offset < 0) {
@ -1447,13 +1452,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot concat " + target, safeMode);
}
concatInternal(target, srcs);
concatInternal(pc, target, srcs);
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(target, false);
}
@ -1469,18 +1475,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
/** See {@link #concat(String, String[])} */
private void concatInternal(String target, String [] srcs)
private void concatInternal(FSPermissionChecker pc, String target, String [] srcs)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
// write permission for the target
if (isPermissionEnabled) {
checkPathAccess(target, FsAction.WRITE);
checkPathAccess(pc, target, FsAction.WRITE);
// and srcs
for(String aSrc: srcs) {
checkPathAccess(aSrc, FsAction.READ); // read the file
checkParentAccess(aSrc, FsAction.WRITE); // for delete
checkPathAccess(pc, aSrc, FsAction.READ); // read the file
checkParentAccess(pc, aSrc, FsAction.WRITE); // for delete
}
}
@ -1597,13 +1603,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException("Access time for hdfs is not configured. " +
" Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
}
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
// Write access is required to set access and modification times
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
checkPathAccess(pc, src, FsAction.WRITE);
}
INode inode = dir.getINode(src);
if (inode != null) {
@ -1644,6 +1651,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -1651,7 +1659,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!createParent) {
verifyParentDir(link);
}
createSymlinkInternal(target, link, dirPerms, createParent);
createSymlinkInternal(pc, target, link, dirPerms, createParent);
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(link, false);
}
@ -1669,8 +1677,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Create a symbolic link.
*/
private void createSymlinkInternal(String target, String link,
PermissionStatus dirPerms, boolean createParent)
private void createSymlinkInternal(FSPermissionChecker pc, String target,
String link, PermissionStatus dirPerms, boolean createParent)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -1688,7 +1696,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+" either because the filename is invalid or the file exists");
}
if (isPermissionEnabled) {
checkAncestorAccess(link, FsAction.WRITE);
checkAncestorAccess(pc, link, FsAction.WRITE);
}
// validate that we have enough inodes.
checkFsObjectLimit();
@ -1727,17 +1735,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private boolean setReplicationInt(final String src, final short replication)
throws IOException {
blockManager.verifyReplication(src, replication, null);
final boolean isFile;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set replication for " + src, safeMode);
}
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
checkPathAccess(pc, src, FsAction.WRITE);
}
final short[] oldReplication = new short[1];
@ -1761,11 +1768,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
long getPreferredBlockSize(String filename)
throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkTraverse(filename);
checkTraverse(pc, filename);
}
return dir.getPreferredBlockSize(filename);
} finally {
@ -1826,11 +1834,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FileNotFoundException, ParentNotDirectoryException, IOException {
boolean skipSync = false;
final HdfsFileStatus stat;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
startFileInternal(src, permissions, holder, clientMachine, flag,
startFileInternal(pc, src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
stat = dir.getFileInfo(src, false);
} catch (StandbyException se) {
@ -1869,7 +1877,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock startFileInternal(String src,
private LocatedBlock startFileInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize) throws SafeModeException, FileAlreadyExistsException,
@ -1902,9 +1910,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean append = flag.contains(CreateFlag.APPEND);
if (isPermissionEnabled) {
if (append || (overwrite && pathExists)) {
checkPathAccess(src, FsAction.WRITE);
checkPathAccess(pc, src, FsAction.WRITE);
} else {
checkAncestorAccess(src, FsAction.WRITE);
checkAncestorAccess(pc, src, FsAction.WRITE);
}
}
@ -2027,6 +2035,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean recoverLease(String src, String holder, String clientMachine)
throws IOException {
boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2044,7 +2053,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return true;
}
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
checkPathAccess(pc, src, FsAction.WRITE);
}
recoverLeaseInternal(inode, src, holder, clientMachine, true);
@ -2167,11 +2176,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_SUPPORT_APPEND_KEY + " configuration option to enable it.");
}
LocatedBlock lb = null;
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
lb = startFileInternal(src, null, holder, clientMachine,
lb = startFileInternal(pc, src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
} catch (StandbyException se) {
@ -2708,11 +2718,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
" to " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = renameToInternal(src, dst);
status = renameToInternal(pc, src, dst);
if (status && isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
}
@ -2730,7 +2741,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** @deprecated See {@link #renameTo(String, String)} */
@Deprecated
private boolean renameToInternal(String src, String dst)
private boolean renameToInternal(FSPermissionChecker pc, String src, String dst)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
if (isInSafeMode()) {
@ -2746,8 +2757,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// of rewriting the dst
String actualdst = dir.isDir(dst)?
dst + Path.SEPARATOR + new Path(src).getName(): dst;
checkParentAccess(src, FsAction.WRITE);
checkAncestorAccess(actualdst, FsAction.WRITE);
checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, actualdst, FsAction.WRITE);
}
if (dir.renameTo(src, dst)) {
@ -2765,11 +2776,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options - "
+ src + " to " + dst);
}
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
renameToInternal(src, dst, options);
renameToInternal(pc, src, dst, options);
if (isAuditEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
}
@ -2787,7 +2798,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
private void renameToInternal(String src, String dst,
private void renameToInternal(FSPermissionChecker pc, String src, String dst,
Options.Rename... options) throws IOException {
assert hasWriteLock();
if (isInSafeMode()) {
@ -2797,8 +2808,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new InvalidPathException("Invalid name: " + dst);
}
if (isPermissionEnabled) {
checkParentAccess(src, FsAction.WRITE);
checkAncestorAccess(dst, FsAction.WRITE);
checkParentAccess(pc, src, FsAction.WRITE);
checkAncestorAccess(pc, dst, FsAction.WRITE);
}
dir.renameTo(src, dst, options);
@ -2840,6 +2851,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return status;
}
private FSPermissionChecker getPermissionChecker()
throws AccessControlException {
return new FSPermissionChecker(fsOwnerShortUserName, supergroup);
}
/**
* Remove a file/directory from the namespace.
* <p>
@ -2856,7 +2871,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException, UnresolvedLinkException,
IOException {
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2867,7 +2882,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new IOException(src + " is non empty");
}
if (enforcePermission && isPermissionEnabled) {
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
checkPermission(pc, src, false, null, FsAction.WRITE, null, FsAction.ALL);
}
// Unlink the target directory from directory tree
if (!dir.delete(src, collectedBlocks)) {
@ -2984,9 +2999,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, UnresolvedLinkException,
StandbyException, IOException {
HdfsFileStatus stat = null;
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -2994,7 +3008,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new InvalidPathException("Invalid file name: " + src);
}
if (isPermissionEnabled) {
checkTraverse(src);
checkTraverse(pc, src);
}
stat = dir.getFileInfo(src, resolveLink);
} catch (AccessControlException e) {
@ -3038,11 +3052,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = mkdirsInternal(src, permissions, createParent);
status = mkdirsInternal(pc, src, permissions, createParent);
} finally {
writeUnlock();
}
@ -3059,7 +3073,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/**
* Create all the necessary directories
*/
private boolean mkdirsInternal(String src,
private boolean mkdirsInternal(FSPermissionChecker pc, String src,
PermissionStatus permissions, boolean createParent)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
@ -3067,7 +3081,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new SafeModeException("Cannot create directory " + src, safeMode);
}
if (isPermissionEnabled) {
checkTraverse(src);
checkTraverse(pc, src);
}
if (dir.isDir(src)) {
// all the users of mkdirs() are used to expect 'true' even if
@ -3078,7 +3092,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throw new InvalidPathException(src);
}
if (isPermissionEnabled) {
checkAncestorAccess(src, FsAction.WRITE);
checkAncestorAccess(pc, src, FsAction.WRITE);
}
if (!createParent) {
verifyParentDir(src);
@ -3097,12 +3111,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
ContentSummary getContentSummary(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, StandbyException {
FSPermissionChecker pc = new FSPermissionChecker(fsOwnerShortUserName,
supergroup);
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
checkPermission(pc, src, false, null, null, null, FsAction.READ_EXECUTE);
}
return dir.getContentSummary(src);
} finally {
@ -3117,15 +3132,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void setQuota(String path, long nsQuota, long dsQuota)
throws IOException, UnresolvedLinkException {
checkSuperuserPrivilege();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set quota on " + path, safeMode);
}
if (isPermissionEnabled) {
checkSuperuserPrivilege();
}
dir.setQuota(path, nsQuota, dsQuota);
} finally {
writeUnlock();
@ -3494,15 +3507,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean needLocation)
throws AccessControlException, UnresolvedLinkException, IOException {
DirectoryListing dl;
FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
if (dir.isDir(src)) {
checkPathAccess(src, FsAction.READ_EXECUTE);
checkPathAccess(pc, src, FsAction.READ_EXECUTE);
} else {
checkTraverse(src);
checkTraverse(pc, src);
}
}
if (isAuditEnabled() && isExternalInvocation()) {
@ -3803,9 +3817,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException if
*/
void saveNamespace() throws AccessControlException, IOException {
checkSuperuserPrivilege();
readLock();
try {
checkSuperuserPrivilege();
if (!isInSafeMode()) {
throw new IOException("Safe mode should be turned ON " +
"in order to create namespace image.");
@ -3824,9 +3838,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws AccessControlException if superuser privilege is violated.
*/
boolean restoreFailedStorage(String arg) throws AccessControlException {
checkSuperuserPrivilege();
writeLock();
try {
checkSuperuserPrivilege();
// if it is disabled - enable it and vice versa.
if(arg.equals("check"))
@ -3846,10 +3860,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
} finally {
writeUnlock();
@ -4585,10 +4599,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
CheckpointSignature rollEditLog() throws IOException {
checkSuperuserPrivilege();
writeLock();
try {
checkOperation(OperationCategory.JOURNAL);
checkSuperuserPrivilege();
if (isInSafeMode()) {
throw new SafeModeException("Log not rolled", safeMode);
}
@ -4639,61 +4653,64 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
}
private FSPermissionChecker checkOwner(String path
) throws AccessControlException, UnresolvedLinkException {
return checkPermission(path, true, null, null, null, null);
private void checkOwner(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, true, null, null, null, null);
}
private FSPermissionChecker checkPathAccess(String path, FsAction access
) throws AccessControlException, UnresolvedLinkException {
return checkPermission(path, false, null, null, access, null);
private void checkPathAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, null, null, access, null);
}
private FSPermissionChecker checkParentAccess(String path, FsAction access
) throws AccessControlException, UnresolvedLinkException {
return checkPermission(path, false, null, access, null, null);
private void checkParentAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, null, access, null, null);
}
private FSPermissionChecker checkAncestorAccess(String path, FsAction access
) throws AccessControlException, UnresolvedLinkException {
return checkPermission(path, false, access, null, null, null);
private void checkAncestorAccess(FSPermissionChecker pc,
String path, FsAction access) throws AccessControlException,
UnresolvedLinkException {
checkPermission(pc, path, false, access, null, null, null);
}
private FSPermissionChecker checkTraverse(String path
) throws AccessControlException, UnresolvedLinkException {
return checkPermission(path, false, null, null, null, null);
private void checkTraverse(FSPermissionChecker pc, String path)
throws AccessControlException, UnresolvedLinkException {
checkPermission(pc, path, false, null, null, null, null);
}
@Override
public void checkSuperuserPrivilege() throws AccessControlException {
public void checkSuperuserPrivilege()
throws AccessControlException {
if (isPermissionEnabled) {
FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
FSPermissionChecker pc = getPermissionChecker();
pc.checkSuperuserPrivilege();
}
}
/**
* Check whether current user have permissions to access the path.
* For more details of the parameters, see
* {@link FSPermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
* Check whether current user have permissions to access the path. For more
* details of the parameters, see
* {@link FSPermissionChecker#checkPermission()}.
*/
private FSPermissionChecker checkPermission(String path, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
FsAction subAccess) throws AccessControlException, UnresolvedLinkException {
FSPermissionChecker pc = new FSPermissionChecker(
fsOwner.getShortUserName(), supergroup);
if (!pc.isSuper) {
private void checkPermission(FSPermissionChecker pc,
String path, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess)
throws AccessControlException, UnresolvedLinkException {
if (!pc.isSuperUser()) {
dir.waitForReady();
readLock();
try {
pc.checkPermission(path, dir.rootDir, doCheckOwner,
ancestorAccess, parentAccess, access, subAccess);
pc.checkPermission(path, dir.rootDir, doCheckOwner, ancestorAccess,
parentAccess, access, subAccess);
} finally {
readUnlock();
}
}
}
return pc;
}
/**
* Check to see if we have exceeded the limit on the number
* of inodes.
@ -5137,16 +5154,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
String[] cookieTab) throws IOException {
checkSuperuserPrivilege();
readLock();
try {
checkOperation(OperationCategory.READ);
if (!isPopulatingReplQueues()) {
throw new IOException("Cannot run listCorruptFileBlocks because " +
"replication queues have not been initialized.");
}
checkSuperuserPrivilege();
// print a limited # of corrupt files per call
int count = 0;
ArrayList<CorruptFileBlockInfo> corruptFiles = new ArrayList<CorruptFileBlockInfo>();

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.Stack;
@ -31,14 +32,20 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
/** Perform permission checking in {@link FSNamesystem}. */
/**
* Class that helps in checking file system permission.
* The state of this class need not be synchronized as it has data structures that
* are read-only.
*
* Some of the helper methods are gaurded by {@link FSNamesystem#readLock()}.
*/
class FSPermissionChecker {
static final Log LOG = LogFactory.getLog(UserGroupInformation.class);
private final UserGroupInformation ugi;
public final String user;
private final Set<String> groups = new HashSet<String>();
public final boolean isSuper;
private final String user;
/** A set with group namess. Not synchronized since it is unmodifiable */
private final Set<String> groups;
private final boolean isSuper;
FSPermissionChecker(String fsOwner, String supergroup
) throws AccessControlException{
@ -47,10 +54,9 @@ class FSPermissionChecker {
} catch (IOException e) {
throw new AccessControlException(e);
}
groups.addAll(Arrays.asList(ugi.getGroupNames()));
HashSet<String> s = new HashSet<String>(Arrays.asList(ugi.getGroupNames()));
groups = Collections.unmodifiableSet(s);
user = ugi.getShortUserName();
isSuper = user.equals(fsOwner) || groups.contains(supergroup);
}
@ -60,20 +66,23 @@ class FSPermissionChecker {
*/
public boolean containsGroup(String group) {return groups.contains(group);}
public String getUser() {
return user;
}
public boolean isSuperUser() {
return isSuper;
}
/**
* Verify if the caller has the required permission. This will result into
* an exception if the caller is not allowed to access the resource.
* @param owner owner of the system
* @param supergroup supergroup of the system
*/
public static void checkSuperuserPrivilege(UserGroupInformation owner,
String supergroup)
throws AccessControlException {
FSPermissionChecker checker =
new FSPermissionChecker(owner.getShortUserName(), supergroup);
if (!checker.isSuper) {
public void checkSuperuserPrivilege()
throws AccessControlException {
if (!isSuper) {
throw new AccessControlException("Access denied for user "
+ checker.user + ". Superuser privilege is required");
+ user + ". Superuser privilege is required");
}
}
@ -103,9 +112,11 @@ class FSPermissionChecker {
* @param subAccess If path is a directory,
* it is the access required of the path and all the sub-directories.
* If path is not a directory, there is no effect.
* @return a PermissionChecker object which caches data for later use.
* @throws AccessControlException
* @throws UnresolvedLinkException
*
* Guarded by {@link FSNamesystem#readLock()}
* Caller of this method must hold that lock.
*/
void checkPermission(String path, INodeDirectory root, boolean doCheckOwner,
FsAction ancestorAccess, FsAction parentAccess, FsAction access,
@ -148,6 +159,7 @@ class FSPermissionChecker {
}
}
/** Guarded by {@link FSNamesystem#readLock()} */
private void checkOwner(INode inode) throws AccessControlException {
if (inode != null && user.equals(inode.getUserName())) {
return;
@ -155,6 +167,7 @@ class FSPermissionChecker {
throw new AccessControlException("Permission denied");
}
/** Guarded by {@link FSNamesystem#readLock()} */
private void checkTraverse(INode[] inodes, int last
) throws AccessControlException {
for(int j = 0; j <= last; j++) {
@ -162,6 +175,7 @@ class FSPermissionChecker {
}
}
/** Guarded by {@link FSNamesystem#readLock()} */
private void checkSubAccess(INode inode, FsAction access
) throws AccessControlException {
if (inode == null || !inode.isDirectory()) {
@ -181,11 +195,13 @@ class FSPermissionChecker {
}
}
/** Guarded by {@link FSNamesystem#readLock()} */
private void check(INode[] inodes, int i, FsAction access
) throws AccessControlException {
check(i >= 0? inodes[i]: null, access);
}
/** Guarded by {@link FSNamesystem#readLock()} */
private void check(INode inode, FsAction access
) throws AccessControlException {
if (inode == null) {
@ -206,7 +222,9 @@ class FSPermissionChecker {
+ ", access=" + access + ", inode=" + inode);
}
private void checkStickyBit(INode parent, INode inode) throws AccessControlException {
/** Guarded by {@link FSNamesystem#readLock()} */
private void checkStickyBit(INode parent, INode inode)
throws AccessControlException {
if(!parent.getFsPermission().getStickyBit()) {
return;
}