HDFS-13136. Avoid taking FSN lock while doing group member lookup for FSD permission check. Contributed by Xiaoyu Yao.

(cherry picked from commit e978e1d9064fc794ea83217d0af2315d4167211e)
This commit is contained in:
Xiaoyu Yao 2018-02-15 00:02:05 -08:00
parent c2bbe22c5a
commit ec1b445c62
17 changed files with 261 additions and 201 deletions

View File

@ -154,9 +154,10 @@ public class EncryptionZoneManager {
public void pauseForTestingAfterNthCheckpoint(final String zone,
final int count) throws IOException {
INodesInPath iip;
final FSPermissionChecker pc = dir.getPermissionChecker();
dir.readLock();
try {
iip = dir.resolvePath(dir.getPermissionChecker(), zone, DirOp.READ);
iip = dir.resolvePath(pc, zone, DirOp.READ);
} finally {
dir.readUnlock();
}

View File

@ -36,11 +36,10 @@ import java.util.List;
class FSDirAclOp {
static FileStatus modifyAclEntries(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
List<AclEntry> aclSpec) throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -61,11 +60,10 @@ class FSDirAclOp {
}
static FileStatus removeAclEntries(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
List<AclEntry> aclSpec) throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -85,11 +83,10 @@ class FSDirAclOp {
return fsd.getAuditFileInfo(iip);
}
static FileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
throws IOException {
static FileStatus removeDefaultAcl(FSDirectory fsd, FSPermissionChecker pc,
final String srcArg) throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -109,11 +106,10 @@ class FSDirAclOp {
return fsd.getAuditFileInfo(iip);
}
static FileStatus removeAcl(FSDirectory fsd, final String srcArg)
throws IOException {
static FileStatus removeAcl(FSDirectory fsd, FSPermissionChecker pc,
final String srcArg) throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -129,11 +125,10 @@ class FSDirAclOp {
}
static FileStatus setAcl(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
List<AclEntry> aclSpec) throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -148,9 +143,8 @@ class FSDirAclOp {
}
static AclStatus getAclStatus(
FSDirectory fsd, String src) throws IOException {
FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);

View File

@ -148,7 +148,7 @@ final class FSDirAppendOp {
fsd.writeUnlock();
}
HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, iip);
HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, pc, iip);
if (lb != null) {
NameNode.stateChangeLog.debug(
"DIR* NameSystem.appendFile: file {} for {} at {} block {} block"

View File

@ -51,12 +51,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
public class FSDirAttrOp {
static FileStatus setPermission(
FSDirectory fsd, final String src, FsPermission permission)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, final String src,
FsPermission permission) throws IOException {
if (FSDirectory.isExactReservedName(src)) {
throw new InvalidPathException(src);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -71,12 +70,11 @@ public class FSDirAttrOp {
}
static FileStatus setOwner(
FSDirectory fsd, String src, String username, String group)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, String src, String username,
String group) throws IOException {
if (FSDirectory.isExactReservedName(src)) {
throw new InvalidPathException(src);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -101,10 +99,8 @@ public class FSDirAttrOp {
}
static FileStatus setTimes(
FSDirectory fsd, String src, long mtime, long atime)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
FSDirectory fsd, FSPermissionChecker pc, String src, long mtime,
long atime) throws IOException {
INodesInPath iip;
fsd.writeLock();
try {
@ -129,11 +125,10 @@ public class FSDirAttrOp {
}
static boolean setReplication(
FSDirectory fsd, BlockManager bm, String src, final short replication)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, BlockManager bm, String src,
final short replication) throws IOException {
bm.verifyReplication(src, replication, null);
final boolean isFile;
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE);
@ -153,33 +148,31 @@ public class FSDirAttrOp {
return isFile;
}
static FileStatus unsetStoragePolicy(FSDirectory fsd, BlockManager bm,
String src) throws IOException {
return setStoragePolicy(fsd, bm, src,
static FileStatus unsetStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
BlockManager bm, String src) throws IOException {
return setStoragePolicy(fsd, pc, bm, src,
HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, "unset");
}
static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
String src, final String policyName) throws IOException {
static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
BlockManager bm, String src, final String policyName) throws IOException {
// get the corresponding policy and make sure the policy name is valid
BlockStoragePolicy policy = bm.getStoragePolicy(policyName);
if (policy == null) {
throw new HadoopIllegalArgumentException(
"Cannot find a block policy with the name " + policyName);
}
return setStoragePolicy(fsd, bm, src, policy.getId(), "set");
return setStoragePolicy(fsd, pc, bm, src, policy.getId(), "set");
}
static FileStatus setStoragePolicy(FSDirectory fsd, BlockManager bm,
String src, final byte policyId, final String operation)
static FileStatus setStoragePolicy(FSDirectory fsd, FSPermissionChecker pc,
BlockManager bm, String src, final byte policyId, final String operation)
throws IOException {
if (!fsd.isStoragePolicyEnabled()) {
throw new IOException(String.format(
"Failed to %s storage policy since %s is set to false.", operation,
DFS_STORAGE_POLICY_ENABLED_KEY));
}
FSPermissionChecker pc = fsd.getPermissionChecker();
INodesInPath iip;
fsd.writeLock();
try {
@ -202,9 +195,8 @@ public class FSDirAttrOp {
return bm.getStoragePolicies();
}
static BlockStoragePolicy getStoragePolicy(FSDirectory fsd, BlockManager bm,
String path) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
static BlockStoragePolicy getStoragePolicy(FSDirectory fsd,
FSPermissionChecker pc, BlockManager bm, String path) throws IOException {
fsd.readLock();
try {
final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ_LINK);
@ -222,9 +214,8 @@ public class FSDirAttrOp {
}
}
static long getPreferredBlockSize(FSDirectory fsd, String src)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
static long getPreferredBlockSize(FSDirectory fsd, FSPermissionChecker pc,
String src) throws IOException {
fsd.readLock();
try {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
@ -240,9 +231,8 @@ public class FSDirAttrOp {
*
* Note: This does not support ".inodes" relative path.
*/
static void setQuota(FSDirectory fsd, String src, long nsQuota, long ssQuota,
StorageType type) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
static void setQuota(FSDirectory fsd, FSPermissionChecker pc, String src,
long nsQuota, long ssQuota, StorageType type) throws IOException {
if (fsd.isPermissionEnabled()) {
pc.checkSuperuserPrivilege();
}

View File

@ -48,14 +48,13 @@ import static org.apache.hadoop.util.Time.now;
*/
class FSDirConcatOp {
static FileStatus concat(FSDirectory fsd, String target, String[] srcs,
boolean logRetryCache) throws IOException {
static FileStatus concat(FSDirectory fsd, FSPermissionChecker pc,
String target, String[] srcs, boolean logRetryCache) throws IOException {
validatePath(target, srcs);
assert srcs != null;
if (FSDirectory.LOG.isDebugEnabled()) {
FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
final INodesInPath targetIIP = fsd.resolvePath(pc, target, DirOp.WRITE);
// write permission for the target
if (fsd.isPermissionEnabled()) {

View File

@ -88,6 +88,7 @@ class FSDirDeleteOp {
* For small directory or file the deletion is done in one shot.
*
* @param fsn namespace
* @param pc FS permission checker
* @param src path name to be deleted
* @param recursive boolean true to apply to all sub-directories recursively
* @param logRetryCache whether to record RPC ids in editlog for retry cache
@ -96,10 +97,9 @@ class FSDirDeleteOp {
* @throws IOException
*/
static BlocksMapUpdateInfo delete(
FSNamesystem fsn, String src, boolean recursive, boolean logRetryCache)
throws IOException {
FSNamesystem fsn, FSPermissionChecker pc, String src, boolean recursive,
boolean logRetryCache) throws IOException {
FSDirectory fsd = fsn.getFSDirectory();
FSPermissionChecker pc = fsd.getPermissionChecker();
if (FSDirectory.isExactReservedName(src)) {
throw new InvalidPathException(src);
@ -130,7 +130,7 @@ class FSDirDeleteOp {
* <br>
*
* @param fsd the FSDirectory instance
* @param src a string representation of a path to an inode
* @param iip inodes of a path to be deleted
* @param mtime the time the inode is removed
*/
static void deleteForEditLog(FSDirectory fsd, INodesInPath iip, long mtime)

View File

@ -693,11 +693,12 @@ final class FSDirEncryptionZoneOp {
* a different ACL. HDFS should not try to operate on additional ACLs, but
* rather use the generate ACL it already has.
*/
static String getCurrentKeyVersion(final FSDirectory dir, final String zone)
throws IOException {
static String getCurrentKeyVersion(final FSDirectory dir,
final FSPermissionChecker pc, final String zone) throws IOException {
assert dir.getProvider() != null;
assert !dir.hasReadLock();
final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir, zone);
final String keyName = FSDirEncryptionZoneOp.getKeyNameForZone(dir,
pc, zone);
if (keyName == null) {
throw new IOException(zone + " is not an encryption zone.");
}
@ -719,11 +720,10 @@ final class FSDirEncryptionZoneOp {
* Resolve the zone to an inode, find the encryption zone info associated with
* that inode, and return the key name. Does not contact the KMS.
*/
static String getKeyNameForZone(final FSDirectory dir, final String zone)
throws IOException {
static String getKeyNameForZone(final FSDirectory dir,
final FSPermissionChecker pc, final String zone) throws IOException {
assert dir.getProvider() != null;
final INodesInPath iip;
final FSPermissionChecker pc = dir.getPermissionChecker();
dir.readLock();
try {
iip = dir.resolvePath(pc, zone, DirOp.READ);

View File

@ -39,13 +39,12 @@ import static org.apache.hadoop.util.Time.now;
class FSDirMkdirOp {
static FileStatus mkdirs(FSNamesystem fsn, String src,
static FileStatus mkdirs(FSNamesystem fsn, FSPermissionChecker pc, String src,
PermissionStatus permissions, boolean createParent) throws IOException {
FSDirectory fsd = fsn.getFSDirectory();
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.writeLock();
try {
INodesInPath iip = fsd.resolvePath(pc, src, DirOp.CREATE);

View File

@ -47,14 +47,12 @@ import static org.apache.hadoop.hdfs.protocol.FSLimitException.PathComponentTooL
class FSDirRenameOp {
@Deprecated
static RenameResult renameToInt(
FSDirectory fsd, final String src, final String dst,
boolean logRetryCache)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, final String src,
final String dst, boolean logRetryCache) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src +
" to " + dst);
}
FSPermissionChecker pc = fsd.getPermissionChecker();
// Rename does not operate on link targets
// Do not resolveLink when checking permissions of src and dst
@ -230,8 +228,8 @@ class FSDirRenameOp {
* The new rename which has the POSIX semantic.
*/
static RenameResult renameToInt(
FSDirectory fsd, final String srcArg, final String dstArg,
boolean logRetryCache, Options.Rename... options)
FSDirectory fsd, FSPermissionChecker pc, final String srcArg,
final String dstArg, boolean logRetryCache, Options.Rename... options)
throws IOException {
String src = srcArg;
String dst = dstArg;
@ -239,7 +237,6 @@ class FSDirRenameOp {
NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: with options -" +
" " + src + " to " + dst);
}
final FSPermissionChecker pc = fsd.getPermissionChecker();
BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
// returns resolved path

View File

@ -78,14 +78,17 @@ class FSDirSnapshotOp {
/**
* Create a snapshot
* @param fsd FS directory
* @param pc FS permission checker
* @param snapshotRoot The directory path where the snapshot is taken
* @param snapshotName The name of the snapshot
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding.
*/
static String createSnapshot(
FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
String snapshotName, boolean logRetryCache)
FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
String snapshotRoot, String snapshotName, boolean logRetryCache)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
if (fsd.isPermissionEnabled()) {
fsd.checkOwner(pc, iip);
@ -113,10 +116,9 @@ class FSDirSnapshotOp {
return snapshotPath;
}
static void renameSnapshot(FSDirectory fsd, SnapshotManager snapshotManager,
String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
static void renameSnapshot(FSDirectory fsd, FSPermissionChecker pc,
SnapshotManager snapshotManager, String path, String snapshotOldName,
String snapshotNewName, boolean logRetryCache) throws IOException {
final INodesInPath iip = fsd.resolvePath(pc, path, DirOp.WRITE);
if (fsd.isPermissionEnabled()) {
fsd.checkOwner(pc, iip);
@ -134,8 +136,8 @@ class FSDirSnapshotOp {
}
static SnapshottableDirectoryStatus[] getSnapshottableDirListing(
FSDirectory fsd, SnapshotManager snapshotManager) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager)
throws IOException {
fsd.readLock();
try {
final String user = pc.isSuperUser()? null : pc.getUser();
@ -146,10 +148,9 @@ class FSDirSnapshotOp {
}
static SnapshotDiffReport getSnapshotDiffReport(FSDirectory fsd,
SnapshotManager snapshotManager, String path,
FSPermissionChecker pc, SnapshotManager snapshotManager, String path,
String fromSnapshot, String toSnapshot) throws IOException {
SnapshotDiffReport diffs;
final FSPermissionChecker pc = fsd.getPermissionChecker();
fsd.readLock();
try {
INodesInPath iip = fsd.resolvePath(pc, path, DirOp.READ);
@ -199,15 +200,19 @@ class FSDirSnapshotOp {
/**
* Delete a snapshot of a snapshottable directory
* @param fsd The FS directory
* @param pc The permission checker
* @param snapshotManager The snapshot manager
* @param snapshotRoot The snapshottable directory
* @param snapshotName The name of the to-be-deleted snapshot
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding.
* @throws IOException
*/
static INode.BlocksMapUpdateInfo deleteSnapshot(
FSDirectory fsd, SnapshotManager snapshotManager, String snapshotRoot,
String snapshotName, boolean logRetryCache)
FSDirectory fsd, FSPermissionChecker pc, SnapshotManager snapshotManager,
String snapshotRoot, String snapshotName, boolean logRetryCache)
throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
final INodesInPath iip = fsd.resolvePath(pc, snapshotRoot, DirOp.WRITE);
if (fsd.isPermissionEnabled()) {
fsd.checkOwner(pc, iip);

View File

@ -51,9 +51,9 @@ import java.util.EnumSet;
import static org.apache.hadoop.util.Time.now;
class FSDirStatAndListingOp {
static DirectoryListing getListingInt(FSDirectory fsd, final String srcArg,
byte[] startAfter, boolean needLocation) throws IOException {
final FSPermissionChecker pc = fsd.getPermissionChecker();
static DirectoryListing getListingInt(FSDirectory fsd, FSPermissionChecker pc,
final String srcArg, byte[] startAfter, boolean needLocation)
throws IOException {
final INodesInPath iip = fsd.resolvePath(pc, srcArg, DirOp.READ);
// Get file name when startAfter is an INodePath. This is not the
@ -85,7 +85,8 @@ class FSDirStatAndListingOp {
/**
* Get the file info for a specific file.
*
* @param fsd The FS directory
* @param pc The permission checker
* @param srcArg The string representation of the path to the file
* @param resolveLink whether to throw UnresolvedLinkException
* if src refers to a symlink
@ -93,11 +94,9 @@ class FSDirStatAndListingOp {
* @return object containing information regarding the file
* or null if file not found
*/
static HdfsFileStatus getFileInfo(
FSDirectory fsd, String srcArg, boolean resolveLink)
throws IOException {
static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
String srcArg, boolean resolveLink) throws IOException {
DirOp dirOp = resolveLink ? DirOp.READ : DirOp.READ_LINK;
FSPermissionChecker pc = fsd.getPermissionChecker();
final INodesInPath iip;
if (pc.isSuperUser()) {
// superuser can only get an ACE if an existing ancestor is a file.
@ -111,25 +110,24 @@ class FSDirStatAndListingOp {
} else {
iip = fsd.resolvePath(pc, srcArg, dirOp);
}
return getFileInfo(fsd, iip);
return getFileInfo(fsd, pc, iip);
}
/**
* Returns true if the file is closed
*/
static boolean isFileClosed(FSDirectory fsd, String src) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
static boolean isFileClosed(FSDirectory fsd, FSPermissionChecker pc,
String src) throws IOException {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
return !INodeFile.valueOf(iip.getLastINode(), src).isUnderConstruction();
}
static ContentSummary getContentSummary(
FSDirectory fsd, String src) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ_LINK);
// getContentSummaryInt() call will check access (if enabled) when
// traversing all sub directories.
return getContentSummaryInt(fsd, iip);
return getContentSummaryInt(fsd, pc, iip);
}
/**
@ -322,12 +320,13 @@ class FSDirStatAndListingOp {
/** Get the file info for a specific file.
* @param fsd FSDirectory
* @param pc The permission checker
* @param iip The path to the file, the file is included
* @param includeStoragePolicy whether to include storage policy
* @return object containing information regarding the file
* or null if file not found
*/
static HdfsFileStatus getFileInfo(FSDirectory fsd,
static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
INodesInPath iip, boolean includeStoragePolicy) throws IOException {
fsd.readLock();
try {
@ -344,8 +343,8 @@ class FSDirStatAndListingOp {
}
}
static HdfsFileStatus getFileInfo(FSDirectory fsd, INodesInPath iip)
throws IOException {
static HdfsFileStatus getFileInfo(FSDirectory fsd, FSPermissionChecker pc,
INodesInPath iip) throws IOException {
fsd.readLock();
try {
HdfsFileStatus status = null;
@ -356,7 +355,7 @@ class FSDirStatAndListingOp {
status = FSDirectory.DOT_SNAPSHOT_DIR_STATUS;
}
} else {
status = getFileInfo(fsd, iip, true);
status = getFileInfo(fsd, pc, iip, true);
}
return status;
} finally {
@ -507,7 +506,7 @@ class FSDirStatAndListingOp {
}
private static ContentSummary getContentSummaryInt(FSDirectory fsd,
INodesInPath iip) throws IOException {
FSPermissionChecker pc, INodesInPath iip) throws IOException {
fsd.readLock();
try {
INode targetNode = iip.getLastINode();
@ -519,8 +518,7 @@ class FSDirStatAndListingOp {
// processed. 0 means disabled. I.e. blocking for the entire duration.
ContentSummaryComputationContext cscc =
new ContentSummaryComputationContext(fsd, fsd.getFSNamesystem(),
fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(),
fsd.getPermissionChecker());
fsd.getContentCountLimit(), fsd.getContentSleepMicroSec(), pc);
ContentSummary cs = targetNode.computeAndConvertContentSummary(
iip.getPathSnapshotId(), cscc);
fsd.addYieldCount(cscc.getYieldCount());
@ -532,8 +530,7 @@ class FSDirStatAndListingOp {
}
static QuotaUsage getQuotaUsage(
FSDirectory fsd, String src) throws IOException {
FSPermissionChecker pc = fsd.getPermissionChecker();
FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
final INodesInPath iip;
fsd.readLock();
try {
@ -550,7 +547,7 @@ class FSDirStatAndListingOp {
return usage;
} else {
//If quota isn't set, fall back to getContentSummary.
return getContentSummaryInt(fsd, iip);
return getContentSummaryInt(fsd, pc, iip);
}
}

View File

@ -349,7 +349,7 @@ class FSDirWriteFileOp {
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
FSNamesystem fsn, INodesInPath iip,
FSNamesystem fsn, FSPermissionChecker pc, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
@ -408,7 +408,7 @@ class FSDirWriteFileOp {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, iip);
return FSDirStatAndListingOp.getFileInfo(fsd, pc, iip);
}
static INodeFile addFileForEditLog(

View File

@ -51,22 +51,27 @@ class FSDirXAttrOp {
/**
* Set xattr for a file or directory.
*
* @param fsd
* - FS directory
* @param pc
* - FS permission checker
* @param src
* - path on which it sets the xattr
* @param xAttr
* - xAttr details to set
* @param flag
* - xAttrs flags
* @param logRetryCache
* - whether to record RPC ids in editlog for retry cache
* rebuilding.
* @throws IOException
*/
static FileStatus setXAttr(
FSDirectory fsd, String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
boolean logRetryCache)
FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
EnumSet<XAttrSetFlag> flag, boolean logRetryCache)
throws IOException {
checkXAttrsConfigFlag(fsd);
checkXAttrSize(fsd, xAttr);
FSPermissionChecker pc = fsd.getPermissionChecker();
XAttrPermissionFilter.checkPermissionForApi(
pc, xAttr, FSDirectory.isReservedRawName(src));
List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
@ -85,12 +90,10 @@ class FSDirXAttrOp {
return fsd.getAuditFileInfo(iip);
}
static List<XAttr> getXAttrs(FSDirectory fsd, final String srcArg,
List<XAttr> xAttrs)
throws IOException {
static List<XAttr> getXAttrs(FSDirectory fsd, FSPermissionChecker pc,
final String srcArg, List<XAttr> xAttrs) throws IOException {
String src = srcArg;
checkXAttrsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
final boolean isRawPath = FSDirectory.isReservedRawName(src);
boolean getAll = xAttrs == null || xAttrs.isEmpty();
if (!getAll) {
@ -131,9 +134,8 @@ class FSDirXAttrOp {
}
static List<XAttr> listXAttrs(
FSDirectory fsd, String src) throws IOException {
FSDirectory fsd, FSPermissionChecker pc, String src) throws IOException {
FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
final FSPermissionChecker pc = fsd.getPermissionChecker();
final boolean isRawPath = FSDirectory.isReservedRawName(src);
final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.READ);
if (fsd.isPermissionEnabled()) {
@ -146,18 +148,23 @@ class FSDirXAttrOp {
/**
* Remove an xattr for a file or directory.
*
* @param fsd
* - FS direcotry
* @param pc
* - FS permission checker
* @param src
* - path to remove the xattr from
* @param xAttr
* - xAttr to remove
* @param logRetryCache
* - whether to record RPC ids in editlog for retry cache
* rebuilding.
* @throws IOException
*/
static FileStatus removeXAttr(
FSDirectory fsd, String src, XAttr xAttr, boolean logRetryCache)
throws IOException {
FSDirectory fsd, FSPermissionChecker pc, String src, XAttr xAttr,
boolean logRetryCache) throws IOException {
FSDirXAttrOp.checkXAttrsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
XAttrPermissionFilter.checkPermissionForApi(
pc, xAttr, FSDirectory.isReservedRawName(src));

View File

@ -1850,11 +1850,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setPermission";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set permission for " + src);
auditStat = FSDirAttrOp.setPermission(dir, src, permission);
auditStat = FSDirAttrOp.setPermission(dir, pc, src, permission);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -1874,11 +1875,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setOwner";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set owner for " + src);
auditStat = FSDirAttrOp.setOwner(dir, src, username, group);
auditStat = FSDirAttrOp.setOwner(dir, pc, src, username, group);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -1898,7 +1900,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "open";
checkOperation(OperationCategory.READ);
GetBlockLocationsResult res = null;
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -2011,11 +2013,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "concat";
FileStatus stat = null;
boolean success = false;
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot concat " + target);
stat = FSDirConcatOp.concat(dir, target, srcs, logRetryCache);
stat = FSDirConcatOp.concat(dir, pc, target, srcs, logRetryCache);
success = true;
} catch (AccessControlException ace) {
logAuditEvent(success, operationName, Arrays.toString(srcs),
@ -2039,11 +2042,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setTimes";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set times " + src);
auditStat = FSDirAttrOp.setTimes(dir, src, mtime, atime);
auditStat = FSDirAttrOp.setTimes(dir, pc, src, mtime, atime);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -2077,8 +2081,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new HadoopIllegalArgumentException(
"Cannot truncate to a negative file size: " + newLength + ".");
}
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
BlocksMapUpdateInfo toRemoveBlocks = new BlocksMapUpdateInfo();
try {
@ -2147,11 +2151,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setReplication";
boolean success = false;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set replication for " + src);
success = FSDirAttrOp.setReplication(dir, blockManager, src, replication);
success = FSDirAttrOp.setReplication(dir, pc, blockManager, src,
replication);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -2175,11 +2181,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setStoragePolicy";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set storage policy for " + src);
auditStat = FSDirAttrOp.setStoragePolicy(dir, blockManager, src,
auditStat = FSDirAttrOp.setStoragePolicy(dir, pc, blockManager, src,
policyName);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
@ -2200,11 +2207,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "unsetStoragePolicy";
FileStatus auditStat;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot unset storage policy for " + src);
auditStat = FSDirAttrOp.unsetStoragePolicy(dir, blockManager, src);
auditStat = FSDirAttrOp.unsetStoragePolicy(dir, pc, blockManager, src);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -2223,10 +2231,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
BlockStoragePolicy getStoragePolicy(String src) throws IOException {
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
return FSDirAttrOp.getStoragePolicy(dir, blockManager, src);
return FSDirAttrOp.getStoragePolicy(dir, pc, blockManager, src);
} finally {
readUnlock("getStoragePolicy");
}
@ -2248,10 +2257,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
long getPreferredBlockSize(String src) throws IOException {
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
return FSDirAttrOp.getPreferredBlockSize(dir, src);
return FSDirAttrOp.getPreferredBlockSize(dir, pc, src);
} finally {
readUnlock("getPreferredBlockSize");
}
@ -2355,13 +2365,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
"ecPolicyName are exclusive parameters. Set both is not allowed!");
}
FSPermissionChecker pc = getPermissionChecker();
INodesInPath iip = null;
boolean skipSync = true; // until we do something that might create edits
HdfsFileStatus stat = null;
BlocksMapUpdateInfo toRemoveBlocks = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2402,7 +2412,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
toRemoveBlocks = new BlocksMapUpdateInfo();
dir.writeLock();
try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
stat = FSDirWriteFileOp.startFile(this, pc, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
} catch (IOException e) {
@ -2442,8 +2452,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean recoverLease(String src, String holder, String clientMachine)
throws IOException {
boolean skipSync = false;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2582,8 +2592,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
boolean skipSync = false;
LastBlockWithStatus lbs = null;
final FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2638,8 +2648,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirWriteFileOp.ValidateAddBlockResult r;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -2689,7 +2699,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final List<DatanodeStorageInfo> chosen;
final BlockType blockType;
checkOperation(OperationCategory.READ);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -2737,7 +2747,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.abandonBlock: {} of file {}", b, src);
checkOperation(OperationCategory.WRITE);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2802,7 +2812,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
boolean success = false;
checkOperation(OperationCategory.WRITE);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2880,11 +2890,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "rename";
FSDirRenameOp.RenameResult ret = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
ret = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache);
ret = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src, dst, null);
throw e;
@ -2904,11 +2916,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "rename";
FSDirRenameOp.RenameResult res = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
res = FSDirRenameOp.renameToInt(dir, src, dst, logRetryCache, options);
res = FSDirRenameOp.renameToInt(dir, pc, src, dst, logRetryCache,
options);
} catch (AccessControlException e) {
logAuditEvent(false, operationName + " (options=" +
Arrays.toString(options) + ")", src, dst, null);
@ -2939,13 +2954,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "delete";
BlocksMapUpdateInfo toRemovedBlocks = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
boolean ret = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete " + src);
toRemovedBlocks = FSDirDeleteOp.delete(
this, src, recursive, logRetryCache);
this, pc, src, recursive, logRetryCache);
ret = toRemovedBlocks != null;
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
@ -3039,10 +3056,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "getfileinfo";
checkOperation(OperationCategory.READ);
HdfsFileStatus stat = null;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
stat = FSDirStatAndListingOp.getFileInfo(dir, src, resolveLink);
stat = FSDirStatAndListingOp.getFileInfo(
dir, pc, src, resolveLink);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -3059,10 +3078,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean isFileClosed(final String src) throws IOException {
final String operationName = "isFileClosed";
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
return FSDirStatAndListingOp.isFileClosed(dir, src);
return FSDirStatAndListingOp.isFileClosed(dir, pc, src);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -3079,11 +3099,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "mkdirs";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create directory " + src);
auditStat = FSDirMkdirOp.mkdirs(this, src, permissions, createParent);
auditStat = FSDirMkdirOp.mkdirs(this, pc, src, permissions,
createParent);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -3112,12 +3134,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
ContentSummary getContentSummary(final String src) throws IOException {
checkOperation(OperationCategory.READ);
final String operationName = "contentSummary";
readLock();
boolean success = true;
ContentSummary cs;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
cs = FSDirStatAndListingOp.getContentSummary(dir, src);
cs = FSDirStatAndListingOp.getContentSummary(dir, pc, src);
} catch (AccessControlException ace) {
success = false;
logAuditEvent(success, operationName, src);
@ -3147,11 +3170,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkOperation(OperationCategory.READ);
final String operationName = "quotaUsage";
QuotaUsage quotaUsage;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
boolean success = true;
try {
checkOperation(OperationCategory.READ);
quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, src);
quotaUsage = FSDirStatAndListingOp.getQuotaUsage(dir, pc, src);
} catch (AccessControlException ace) {
success = false;
logAuditEvent(success, operationName, src);
@ -3177,12 +3201,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
checkOperation(OperationCategory.WRITE);
final String operationName = "setQuota";
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
boolean success = false;
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set quota on " + src);
FSDirAttrOp.setQuota(dir, src, nsQuota, ssQuota, type);
FSDirAttrOp.setQuota(dir, pc, src, nsQuota, ssQuota, type);
success = true;
} catch (AccessControlException ace) {
logAuditEvent(success, operationName, src);
@ -3209,8 +3234,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3714,10 +3738,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkOperation(OperationCategory.READ);
final String operationName = "listStatus";
DirectoryListing dl = null;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(NameNode.OperationCategory.READ);
dl = getListingInt(dir, src, startAfter, needLocation);
dl = getListingInt(dir, pc, src, startAfter, needLocation);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -4621,6 +4646,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
void checkSuperuserPrivilege(FSPermissionChecker pc)
throws AccessControlException {
if (isPermissionEnabled) {
pc.checkSuperuserPrivilege();
}
}
/**
* Check to see if we have exceeded the limit on the number
* of inodes.
@ -6303,14 +6335,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
String createSnapshot(String snapshotRoot, String snapshotName,
boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final String operationName = "createSnapshot";
String snapshotPath = null;
boolean success = false;
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create snapshot for " + snapshotRoot);
snapshotPath = FSDirSnapshotOp.createSnapshot(dir,
snapshotPath = FSDirSnapshotOp.createSnapshot(dir, pc,
snapshotManager, snapshotRoot, snapshotName, logRetryCache);
success = true;
} catch (AccessControlException ace) {
@ -6337,15 +6371,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void renameSnapshot(
String path, String snapshotOldName, String snapshotNewName,
boolean logRetryCache) throws IOException {
checkOperation(OperationCategory.WRITE);
final String operationName = "renameSnapshot";
boolean success = false;
String oldSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotOldName);
String newSnapshotRoot = Snapshot.getSnapshotPath(path, snapshotNewName);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename snapshot for " + path);
FSDirSnapshotOp.renameSnapshot(dir, snapshotManager, path,
FSDirSnapshotOp.renameSnapshot(dir, pc, snapshotManager, path,
snapshotOldName, snapshotNewName, logRetryCache);
success = true;
} catch (AccessControlException ace) {
@ -6373,10 +6409,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
SnapshottableDirectoryStatus[] status = null;
checkOperation(OperationCategory.READ);
boolean success = false;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
status = FSDirSnapshotOp.getSnapshottableDirListing(dir, snapshotManager);
status = FSDirSnapshotOp.getSnapshottableDirListing(dir, pc,
snapshotManager);
success = true;
} catch (AccessControlException ace) {
logAuditEvent(success, operationName, null, null, null);
@ -6413,10 +6451,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
path : Snapshot.getSnapshotPath(path, fromSnapshot);
String toSnapshotRoot = (toSnapshot == null || toSnapshot.isEmpty()) ?
path : Snapshot.getSnapshotPath(path, toSnapshot);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, snapshotManager,
diffs = FSDirSnapshotOp.getSnapshotDiffReport(dir, pc, snapshotManager,
path, fromSnapshot, toSnapshot);
success = true;
} catch (AccessControlException ace) {
@ -6430,7 +6469,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
toSnapshotRoot, null);
return diffs;
}
/**
* Delete a snapshot of a snapshottable directory
* @param snapshotRoot The snapshottable directory
@ -6443,14 +6482,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "deleteSnapshot";
boolean success = false;
String rootPath = null;
writeLock();
BlocksMapUpdateInfo blocksToBeDeleted = null;
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot delete snapshot for " + snapshotRoot);
rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, snapshotManager,
snapshotRoot, snapshotName, logRetryCache);
blocksToBeDeleted = FSDirSnapshotOp.deleteSnapshot(dir, pc,
snapshotManager, snapshotRoot, snapshotName, logRetryCache);
success = true;
} catch (AccessControlException ace) {
logAuditEvent(success, operationName, rootPath, null, null);
@ -6930,11 +6970,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "modifyAclEntries";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
auditStat = FSDirAclOp.modifyAclEntries(dir, pc, src, aclSpec);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -6950,11 +6991,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "removeAclEntries";
checkOperation(OperationCategory.WRITE);
FileStatus auditStat = null;
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
auditStat = FSDirAclOp.removeAclEntries(dir, pc, src, aclSpec);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -6969,11 +7011,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "removeDefaultAcl";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
auditStat = FSDirAclOp.removeDefaultAcl(dir, pc, src);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -6988,11 +7031,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "removeAcl";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL on " + src);
auditStat = FSDirAclOp.removeAcl(dir, src);
auditStat = FSDirAclOp.removeAcl(dir, pc, src);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7007,11 +7051,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "setAcl";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set ACL on " + src);
auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
auditStat = FSDirAclOp.setAcl(dir, pc, src, aclSpec);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7026,10 +7071,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "getAclStatus";
checkOperation(OperationCategory.READ);
final AclStatus ret;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
ret = FSDirAclOp.getAclStatus(dir, src);
ret = FSDirAclOp.getAclStatus(dir, pc, src);
} catch(AccessControlException ace) {
logAuditEvent(false, operationName, src);
throw ace;
@ -7058,13 +7104,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
Metadata metadata = FSDirEncryptionZoneOp.ensureKeyIsInitialized(dir,
keyName, src);
checkSuperuserPrivilege();
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
checkSuperuserPrivilege(pc);
checkOperation(OperationCategory.WRITE);
final FileStatus resultingStat;
writeLock();
try {
checkSuperuserPrivilege();
checkSuperuserPrivilege(pc);
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create encryption zone on " + src);
resultingStat = FSDirEncryptionZoneOp.createEncryptionZone(dir, src,
@ -7119,12 +7165,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "listEncryptionZones";
boolean success = false;
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
checkSuperuserPrivilege(pc);
readLock();
try {
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
checkSuperuserPrivilege(pc);
final BatchedListEntries<EncryptionZone> ret =
FSDirEncryptionZoneOp.listEncryptionZones(dir, prevId);
success = true;
@ -7140,11 +7187,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean success = false;
try {
Preconditions.checkNotNull(zone, "zone is null.");
checkSuperuserPrivilege();
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = dir.getPermissionChecker();
checkSuperuserPrivilege(pc);
checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+ " re-encryption on zone " + zone);
reencryptEncryptionZoneInt(zone, action, logRetryCache);
reencryptEncryptionZoneInt(pc, zone, action, logRetryCache);
success = true;
} finally {
logAuditEvent(success, action + "reencryption", zone, null, null);
@ -7155,12 +7203,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final long prevId) throws IOException {
final String operationName = "listReencryptionStatus";
boolean success = false;
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
checkSuperuserPrivilege(pc);
readLock();
try {
checkSuperuserPrivilege();
checkOperation(OperationCategory.READ);
checkSuperuserPrivilege(pc);
final BatchedListEntries<ZoneReencryptionStatus> ret =
FSDirEncryptionZoneOp.listReencryptionStatus(dir, prevId);
success = true;
@ -7171,9 +7220,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
private void reencryptEncryptionZoneInt(final String zone,
final ReencryptAction action, final boolean logRetryCache)
throws IOException {
private void reencryptEncryptionZoneInt(final FSPermissionChecker pc,
final String zone, final ReencryptAction action,
final boolean logRetryCache) throws IOException {
if (getProvider() == null) {
throw new IOException("No key provider configured, re-encryption "
+ "operation is rejected");
@ -7181,7 +7230,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
String keyVersionName = null;
if (action == ReencryptAction.START) {
// get zone's latest key version name out of the lock.
keyVersionName = FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, zone);
keyVersionName =
FSDirEncryptionZoneOp.getCurrentKeyVersion(dir, pc, zone);
if (keyVersionName == null) {
throw new IOException("Failed to get key version name for " + zone);
}
@ -7190,11 +7240,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
writeLock();
try {
checkSuperuserPrivilege();
checkSuperuserPrivilege(pc);
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("NameNode in safemode, cannot " + action
+ " re-encryption on zone " + zone);
final FSPermissionChecker pc = dir.getPermissionChecker();
List<XAttr> xattrs;
dir.writeLock();
try {
@ -7429,7 +7478,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "getErasureCodingPolicy";
boolean success = false;
checkOperation(OperationCategory.READ);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -7488,11 +7537,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "setXAttr";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set XAttr on " + src);
auditStat = FSDirXAttrOp.setXAttr(dir, src, xAttr, flag, logRetryCache);
auditStat = FSDirXAttrOp.setXAttr(dir, pc, src, xAttr, flag,
logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7508,10 +7560,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "getXAttrs";
checkOperation(OperationCategory.READ);
List<XAttr> fsXattrs;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
fsXattrs = FSDirXAttrOp.getXAttrs(dir, src, xAttrs);
fsXattrs = FSDirXAttrOp.getXAttrs(dir, pc, src, xAttrs);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7526,10 +7579,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final String operationName = "listXAttrs";
checkOperation(OperationCategory.READ);
List<XAttr> fsXattrs;
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
fsXattrs = FSDirXAttrOp.listXAttrs(dir, src);
fsXattrs = FSDirXAttrOp.listXAttrs(dir, pc, src);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7544,11 +7598,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throws IOException {
final String operationName = "removeXAttr";
FileStatus auditStat = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove XAttr entry on " + src);
auditStat = FSDirXAttrOp.removeXAttr(dir, src, xAttr, logRetryCache);
auditStat = FSDirXAttrOp.removeXAttr(dir, pc, src, xAttr, logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, operationName, src);
throw e;
@ -7562,7 +7618,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
void checkAccess(String src, FsAction mode) throws IOException {
final String operationName = "checkAccess";
checkOperation(OperationCategory.READ);
FSPermissionChecker pc = getPermissionChecker();
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
@ -7813,5 +7869,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
.size();
}
// This method logs operatoinName without super user privilege.
// It should be called without holding FSN lock.
void checkSuperuserPrivilege(String operationName)
throws IOException {
try {
checkSuperuserPrivilege();
} catch (AccessControlException ace) {
logAuditEvent(false, operationName, null);
throw ace;
}
}
}

View File

@ -72,12 +72,14 @@ public class NameNodeAdapter {
}
public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
StandbyException, IOException {
boolean resolveLink) throws AccessControlException,
UnresolvedLinkException, StandbyException, IOException {
final FSPermissionChecker pc =
namenode.getNamesystem().getPermissionChecker();
namenode.getNamesystem().readLock();
try {
return FSDirStatAndListingOp.getFileInfo(namenode.getNamesystem()
.getFSDirectory(), src, resolveLink);
.getFSDirectory(), pc, src, resolveLink);
} finally {
namenode.getNamesystem().readUnlock();
}

View File

@ -75,6 +75,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
/**
@ -412,7 +413,7 @@ public class TestAuditLogger {
final FSDirectory mockedDir = Mockito.spy(dir);
AccessControlException ex = new AccessControlException();
doThrow(ex).when(mockedDir).getPermissionChecker();
doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
cluster.getNamesystem().setFSDirectory(mockedDir);
assertTrue(DummyAuditLogger.initialized);
DummyAuditLogger.resetLogCount();

View File

@ -46,8 +46,9 @@ import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.mockito.Mock;
import org.mockito.Mockito;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
public class TestAuditLoggerWithCommands {
@ -529,7 +530,7 @@ public class TestAuditLoggerWithCommands {
@Test
public void testListXattrs() throws Exception {
Path path = new Path("/test");
Path path = new Path("/testListXattrs");
fileSys = DFSTestUtil.getFileSystemAs(user1, conf);
fs.mkdirs(path);
fs.setOwner(path,user1.getUserName(),user1.getPrimaryGroupName());
@ -548,7 +549,7 @@ public class TestAuditLoggerWithCommands {
final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
final FSDirectory mockedDir = Mockito.spy(dir);
AccessControlException ex = new AccessControlException();
doThrow(ex).when(mockedDir).getPermissionChecker();
doThrow(ex).when(mockedDir).checkTraverse(any(), any(), any());
cluster.getNamesystem().setFSDirectory(mockedDir);
String aceGetAclStatus =
".*allowed=false.*ugi=theDoctor.*cmd=getAclStatus.*";