HDFS-7476. Consolidate ACL-related operations to a single class. Contributed by Haohui Mai.

(cherry picked from commit 9297f980c2)
This commit is contained in:
cnauroth 2014-12-06 14:20:00 -08:00
parent b65093c733
commit 00528ad1ad
7 changed files with 318 additions and 320 deletions

View File

@ -181,6 +181,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7459. Consolidate cache-related implementation in FSNamesystem into HDFS-7459. Consolidate cache-related implementation in FSNamesystem into
a single class. (wheat9) a single class. (wheat9)
HDFS-7476. Consolidate ACL-related operations to a single class.
(wheat9 via cnauroth)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -240,39 +240,6 @@ public static List<AclEntry> readINodeLogicalAcl(INode inode) {
return existingAcl; return existingAcl;
} }
/**
* Completely removes the ACL from an inode.
*
* @param inode INode to update
* @param snapshotId int latest snapshot ID of inode
* @throws QuotaExceededException if quota limit is exceeded
*/
public static void removeINodeAcl(INode inode, int snapshotId)
throws QuotaExceededException {
AclFeature f = inode.getAclFeature();
if (f == null) {
return;
}
FsPermission perm = inode.getFsPermission();
List<AclEntry> featureEntries = getEntriesFromAclFeature(f);
if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
// Restore group permissions from the feature's entry to permission
// bits, overwriting the mask, which is not part of a minimal ACL.
AclEntry groupEntryKey = new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
int groupEntryIndex = Collections.binarySearch(featureEntries,
groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR);
assert groupEntryIndex >= 0;
FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
perm.getOtherAction(), perm.getStickyBit());
inode.setPermission(newPerm, snapshotId);
}
inode.removeAclFeature(snapshotId);
}
/** /**
* Updates an inode with a new ACL. This method takes a full logical ACL and * Updates an inode with a new ACL. This method takes a full logical ACL and
* stores the entries to the inode's {@link FsPermission} and * stores the entries to the inode's {@link FsPermission} and

View File

@ -0,0 +1,244 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryScope;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.AclException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
class FSDirAclOp {
static HdfsFileStatus modifyAclEntries(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
fsd.writeLock();
try {
INodesInPath iip = fsd.getINodesInPath4Write(
FSDirectory.normalizePath(src), true);
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.mergeAclEntries(
existingAcl, aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
fsd.getEditLog().logSetAcl(src, newAcl);
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus removeAclEntries(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
fsd.writeLock();
try {
INodesInPath iip = fsd.getINodesInPath4Write(
FSDirectory.normalizePath(src), true);
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
existingAcl, aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
fsd.getEditLog().logSetAcl(src, newAcl);
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg)
throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
fsd.writeLock();
try {
INodesInPath iip = fsd.getINodesInPath4Write(
FSDirectory.normalizePath(src), true);
fsd.checkOwner(pc, iip);
INode inode = FSDirectory.resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
existingAcl);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
fsd.getEditLog().logSetAcl(src, newAcl);
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus removeAcl(FSDirectory fsd, final String srcArg)
throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
fsd.writeLock();
try {
INodesInPath iip = fsd.getINodesInPath4Write(src);
fsd.checkOwner(pc, iip);
unprotectedRemoveAcl(fsd, src);
} finally {
fsd.writeUnlock();
}
fsd.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
return fsd.getAuditFileInfo(src, false);
}
static HdfsFileStatus setAcl(
FSDirectory fsd, final String srcArg, List<AclEntry> aclSpec)
throws IOException {
String src = srcArg;
checkAclsConfigFlag(fsd);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
FSPermissionChecker pc = fsd.getPermissionChecker();
src = fsd.resolvePath(pc, src, pathComponents);
fsd.writeLock();
try {
INodesInPath iip = fsd.getINodesInPath4Write(src);
fsd.checkOwner(pc, iip);
List<AclEntry> newAcl = unprotectedSetAcl(fsd, src, aclSpec);
fsd.getEditLog().logSetAcl(src, newAcl);
} finally {
fsd.writeUnlock();
}
return fsd.getAuditFileInfo(src, false);
}
static AclStatus getAclStatus(
FSDirectory fsd, String src) throws IOException {
checkAclsConfigFlag(fsd);
FSPermissionChecker pc = fsd.getPermissionChecker();
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
src = fsd.resolvePath(pc, src, pathComponents);
String srcs = FSDirectory.normalizePath(src);
fsd.readLock();
try {
// There is no real inode for the path ending in ".snapshot", so return a
// non-null, unpopulated AclStatus. This is similar to getFileInfo.
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
fsd.getINode4DotSnapshot(srcs) != null) {
return new AclStatus.Builder().owner("").group("").build();
}
INodesInPath iip = fsd.getINodesInPath(srcs, true);
if (fsd.isPermissionEnabled()) {
fsd.checkTraverse(pc, iip);
}
INode inode = FSDirectory.resolveLastINode(srcs, iip);
int snapshotId = iip.getPathSnapshotId();
List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
return new AclStatus.Builder()
.owner(inode.getUserName()).group(inode.getGroupName())
.stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
.addEntries(acl).build();
} finally {
fsd.readUnlock();
}
}
static List<AclEntry> unprotectedSetAcl(
FSDirectory fsd, String src, List<AclEntry> aclSpec)
throws IOException {
// ACL removal is logged to edits as OP_SET_ACL with an empty list.
if (aclSpec.isEmpty()) {
unprotectedRemoveAcl(fsd, src);
return AclFeature.EMPTY_ENTRY_LIST;
}
assert fsd.hasWriteLock();
INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath
(src), true);
INode inode = FSDirectory.resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}
private static void checkAclsConfigFlag(FSDirectory fsd) throws AclException {
if (!fsd.isAclsEnabled()) {
throw new AclException(String.format(
"The ACL operation has been rejected. "
+ "Support for ACLs has been disabled by setting %s to false.",
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
}
}
private static void unprotectedRemoveAcl(FSDirectory fsd, String src)
throws IOException {
assert fsd.hasWriteLock();
INodesInPath iip = fsd.getINodesInPath4Write(
FSDirectory.normalizePath(src), true);
INode inode = FSDirectory.resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
AclFeature f = inode.getAclFeature();
if (f == null) {
return;
}
FsPermission perm = inode.getFsPermission();
List<AclEntry> featureEntries = AclStorage.getEntriesFromAclFeature(f);
if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) {
// Restore group permissions from the feature's entry to permission
// bits, overwriting the mask, which is not part of a minimal ACL.
AclEntry groupEntryKey = new AclEntry.Builder()
.setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build();
int groupEntryIndex = Collections.binarySearch(
featureEntries, groupEntryKey,
AclTransformation.ACL_ENTRY_COMPARATOR);
assert groupEntryIndex >= 0;
FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission();
FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm,
perm.getOtherAction(), perm.getStickyBit());
inode.setPermission(newPerm, snapshotId);
}
inode.removeAclFeature(snapshotId);
}
}

View File

@ -48,7 +48,6 @@
import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
@ -143,6 +142,12 @@ private static INodeDirectory createRoot(FSNamesystem namesystem) {
private final ReentrantReadWriteLock dirLock; private final ReentrantReadWriteLock dirLock;
private final boolean isPermissionEnabled; private final boolean isPermissionEnabled;
/**
* Support for ACLs is controlled by a configuration flag. If the
* configuration flag is false, then the NameNode will reject all
* ACL-related operations.
*/
private final boolean aclsEnabled;
private final String fsOwnerShortUserName; private final String fsOwnerShortUserName;
private final String supergroup; private final String supergroup;
private final INodeId inodeId; private final INodeId inodeId;
@ -204,7 +209,10 @@ public int getWriteHoldCount() {
this.supergroup = conf.get( this.supergroup = conf.get(
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.aclsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
LOG.info("ACLs enabled? " + aclsEnabled);
int configuredLimit = conf.getInt( int configuredLimit = conf.getInt(
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ? this.lsLimit = configuredLimit>0 ?
@ -263,6 +271,9 @@ public INodeDirectory getRoot() {
boolean isPermissionEnabled() { boolean isPermissionEnabled() {
return isPermissionEnabled; return isPermissionEnabled;
} }
boolean isAclsEnabled() {
return aclsEnabled;
}
int getLsLimit() { int getLsLimit() {
return lsLimit; return lsLimit;
@ -1549,140 +1560,6 @@ INodeSymlink unprotectedAddSymlink(long id, String path, String target,
return addINode(path, symlink) ? symlink : null; return addINode(path, symlink) ? symlink : null;
} }
List<AclEntry> modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
writeLock();
try {
return unprotectedModifyAclEntries(src, aclSpec);
} finally {
writeUnlock();
}
}
private List<AclEntry> unprotectedModifyAclEntries(String src,
List<AclEntry> aclSpec) throws IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.mergeAclEntries(existingAcl,
aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}
List<AclEntry> removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException {
writeLock();
try {
return unprotectedRemoveAclEntries(src, aclSpec);
} finally {
writeUnlock();
}
}
private List<AclEntry> unprotectedRemoveAclEntries(String src,
List<AclEntry> aclSpec) throws IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterAclEntriesByAclSpec(
existingAcl, aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}
List<AclEntry> removeDefaultAcl(String src) throws IOException {
writeLock();
try {
return unprotectedRemoveDefaultAcl(src);
} finally {
writeUnlock();
}
}
private List<AclEntry> unprotectedRemoveDefaultAcl(String src)
throws IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.filterDefaultAclEntries(
existingAcl);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}
void removeAcl(String src) throws IOException {
writeLock();
try {
unprotectedRemoveAcl(src);
} finally {
writeUnlock();
}
}
private void unprotectedRemoveAcl(String src) throws IOException {
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
AclStorage.removeINodeAcl(inode, snapshotId);
}
List<AclEntry> setAcl(String src, List<AclEntry> aclSpec) throws IOException {
writeLock();
try {
return unprotectedSetAcl(src, aclSpec);
} finally {
writeUnlock();
}
}
List<AclEntry> unprotectedSetAcl(String src, List<AclEntry> aclSpec)
throws IOException {
// ACL removal is logged to edits as OP_SET_ACL with an empty list.
if (aclSpec.isEmpty()) {
unprotectedRemoveAcl(src);
return AclFeature.EMPTY_ENTRY_LIST;
}
assert hasWriteLock();
INodesInPath iip = getINodesInPath4Write(normalizePath(src), true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getLatestSnapshotId();
List<AclEntry> existingAcl = AclStorage.readINodeLogicalAcl(inode);
List<AclEntry> newAcl = AclTransformation.replaceAclEntries(existingAcl,
aclSpec);
AclStorage.updateINodeAcl(inode, newAcl, snapshotId);
return newAcl;
}
AclStatus getAclStatus(String src) throws IOException {
String srcs = normalizePath(src);
readLock();
try {
// There is no real inode for the path ending in ".snapshot", so return a
// non-null, unpopulated AclStatus. This is similar to getFileInfo.
if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) &&
getINode4DotSnapshot(srcs) != null) {
return new AclStatus.Builder().owner("").group("").build();
}
INodesInPath iip = getLastINodeInPath(srcs, true);
INode inode = resolveLastINode(src, iip);
int snapshotId = iip.getPathSnapshotId();
List<AclEntry> acl = AclStorage.readINodeAcl(inode, snapshotId);
return new AclStatus.Builder()
.owner(inode.getUserName()).group(inode.getGroupName())
.stickyBit(inode.getFsPermission(snapshotId).getStickyBit())
.addEntries(acl).build();
} finally {
readUnlock();
}
}
/** /**
* Removes a list of XAttrs from an inode at a path. * Removes a list of XAttrs from an inode at a path.
* *
@ -2065,9 +1942,10 @@ private XAttr unprotectedGetXAttrByName(INode inode, int snapshotId,
return null; return null;
} }
private static INode resolveLastINode(String src, INodesInPath iip) static INode resolveLastINode(String src, INodesInPath iip)
throws FileNotFoundException { throws FileNotFoundException {
INode inode = iip.getLastINode(); INode[] inodes = iip.getINodes();
INode inode = inodes[inodes.length - 1];
if (inode == null) if (inode == null)
throw new FileNotFoundException("cannot find " + src); throw new FileNotFoundException("cannot find " + src);
return inode; return inode;
@ -2246,8 +2124,8 @@ private static String constructRemainingPath(String pathPrefix,
} }
/** @return the {@link INodesInPath} containing only the last inode. */ /** @return the {@link INodesInPath} containing only the last inode. */
private INodesInPath getLastINodeInPath(String path, boolean resolveLink INodesInPath getLastINodeInPath(
) throws UnresolvedLinkException { String path, boolean resolveLink) throws UnresolvedLinkException {
return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1, return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1,
resolveLink); resolveLink);
} }

View File

@ -827,7 +827,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
} }
case OP_SET_ACL: { case OP_SET_ACL: {
SetAclOp setAclOp = (SetAclOp) op; SetAclOp setAclOp = (SetAclOp) op;
fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries); FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries);
break; break;
} }
case OP_SET_XATTR: { case OP_SET_XATTR: {

View File

@ -521,7 +521,6 @@ private void logAuditEvent(boolean succeeded,
private final RetryCache retryCache; private final RetryCache retryCache;
private final boolean aclsEnabled;
private final boolean xattrsEnabled; private final boolean xattrsEnabled;
private final int xattrMaxSize; private final int xattrMaxSize;
@ -842,10 +841,6 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
auditLoggers.get(0) instanceof DefaultAuditLogger; auditLoggers.get(0) instanceof DefaultAuditLogger;
this.retryCache = ignoreRetryCache ? null : initRetryCache(conf); this.retryCache = ignoreRetryCache ? null : initRetryCache(conf);
this.aclsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT);
LOG.info("ACLs enabled? " + aclsEnabled);
this.xattrsEnabled = conf.getBoolean( this.xattrsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY,
DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT); DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT);
@ -7731,158 +7726,105 @@ BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
return results; return results;
} }
void modifyAclEntries(final String srcArg, List<AclEntry> aclSpec) void modifyAclEntries(final String src, List<AclEntry> aclSpec)
throws IOException { throws IOException {
String src = srcArg; HdfsFileStatus auditStat = null;
checkAclsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot modify ACL entries on " + src); checkNameNodeSafeMode("Cannot modify ACL entries on " + src);
src = dir.resolvePath(pc, src, pathComponents); auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
List<AclEntry> newAcl = dir.modifyAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "modifyAclEntries", srcArg); logAuditEvent(false, "modifyAclEntries", src);
throw e; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(true, "modifyAclEntries", srcArg, null, resultingStat); logAuditEvent(true, "modifyAclEntries", src, null, auditStat);
} }
void removeAclEntries(final String srcArg, List<AclEntry> aclSpec) void removeAclEntries(final String src, List<AclEntry> aclSpec)
throws IOException { throws IOException {
String src = srcArg;
checkAclsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); HdfsFileStatus auditStat = null;
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL entries on " + src); checkNameNodeSafeMode("Cannot remove ACL entries on " + src);
src = dir.resolvePath(pc, src, pathComponents); auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
List<AclEntry> newAcl = dir.removeAclEntries(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "removeAclEntries", srcArg); logAuditEvent(false, "removeAclEntries", src);
throw e; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(true, "removeAclEntries", srcArg, null, resultingStat); logAuditEvent(true, "removeAclEntries", src, null, auditStat);
} }
void removeDefaultAcl(final String srcArg) throws IOException { void removeDefaultAcl(final String src) throws IOException {
String src = srcArg; HdfsFileStatus auditStat = null;
checkAclsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove default ACL entries on " + src); checkNameNodeSafeMode("Cannot remove default ACL entries on " + src);
src = dir.resolvePath(pc, src, pathComponents); auditStat = FSDirAclOp.removeDefaultAcl(dir, src);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
List<AclEntry> newAcl = dir.removeDefaultAcl(src);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "removeDefaultAcl", srcArg); logAuditEvent(false, "removeDefaultAcl", src);
throw e; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(true, "removeDefaultAcl", srcArg, null, resultingStat); logAuditEvent(true, "removeDefaultAcl", src, null, auditStat);
} }
void removeAcl(final String srcArg) throws IOException { void removeAcl(final String src) throws IOException {
String src = srcArg; HdfsFileStatus auditStat = null;
checkAclsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot remove ACL on " + src); checkNameNodeSafeMode("Cannot remove ACL on " + src);
src = dir.resolvePath(pc, src, pathComponents); auditStat = FSDirAclOp.removeAcl(dir, src);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
dir.removeAcl(src);
getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "removeAcl", srcArg); logAuditEvent(false, "removeAcl", src);
throw e; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(true, "removeAcl", srcArg, null, resultingStat); logAuditEvent(true, "removeAcl", src, null, auditStat);
} }
void setAcl(final String srcArg, List<AclEntry> aclSpec) throws IOException { void setAcl(final String src, List<AclEntry> aclSpec) throws IOException {
String src = srcArg; HdfsFileStatus auditStat = null;
checkAclsConfigFlag();
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot set ACL on " + src); checkNameNodeSafeMode("Cannot set ACL on " + src);
src = dir.resolvePath(pc, src, pathComponents); auditStat = FSDirAclOp.setAcl(dir, src, aclSpec);
final INodesInPath iip = dir.getINodesInPath4Write(src);
dir.checkOwner(pc, iip);
List<AclEntry> newAcl = dir.setAcl(src, aclSpec);
getEditLog().logSetAcl(src, newAcl);
resultingStat = getAuditFileInfo(src, false);
} catch (AccessControlException e) { } catch (AccessControlException e) {
logAuditEvent(false, "setAcl", srcArg); logAuditEvent(false, "setAcl", src);
throw e; throw e;
} finally { } finally {
writeUnlock(); writeUnlock();
} }
getEditLog().logSync(); getEditLog().logSync();
logAuditEvent(true, "setAcl", srcArg, null, resultingStat); logAuditEvent(true, "setAcl", src, null, auditStat);
} }
AclStatus getAclStatus(String src) throws IOException { AclStatus getAclStatus(String src) throws IOException {
checkAclsConfigFlag();
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean success = false; boolean success = false;
readLock(); readLock();
try { try {
checkOperation(OperationCategory.READ); checkOperation(OperationCategory.READ);
src = dir.resolvePath(pc, src, pathComponents); final AclStatus ret = FSDirAclOp.getAclStatus(dir, src);
INodesInPath iip = dir.getINodesInPath(src, true);
if (isPermissionEnabled) {
dir.checkPermission(pc, iip, false, null, null, null, null);
}
final AclStatus ret = dir.getAclStatus(src);
success = true; success = true;
return ret; return ret;
} finally { } finally {
@ -8370,15 +8312,6 @@ private static void enableAsyncAuditLog() {
} }
} }
private void checkAclsConfigFlag() throws AclException {
if (!aclsEnabled) {
throw new AclException(String.format(
"The ACL operation has been rejected. "
+ "Support for ACLs has been disabled by setting %s to false.",
DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY));
}
}
private void checkXAttrsConfigFlag() throws IOException { private void checkXAttrsConfigFlag() throws IOException {
if (!xattrsEnabled) { if (!xattrsEnabled) {
throw new IOException(String.format( throw new IOException(String.format(

View File

@ -18,21 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -53,6 +38,22 @@
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
/** /**
* Tests for the {@link AuditLogger} custom audit logging interface. * Tests for the {@link AuditLogger} custom audit logging interface.
*/ */
@ -208,27 +209,10 @@ public void testAuditLogWithAclFailure() throws Exception {
try { try {
cluster.waitClusterUp(); cluster.waitClusterUp();
final FSDirectory dir = cluster.getNamesystem().getFSDirectory(); final FSDirectory dir = cluster.getNamesystem().getFSDirectory();
// Set up mock FSDirectory to test FSN audit logging during failure
final FSDirectory mockedDir = Mockito.spy(dir); final FSDirectory mockedDir = Mockito.spy(dir);
Mockito.doThrow(new AccessControlException("mock setAcl exception")). AccessControlException ex = new AccessControlException();
when(mockedDir). doThrow(ex).when(mockedDir).getPermissionChecker();
setAcl(anyString(), anyListOf(AclEntry.class));
Mockito.doThrow(new AccessControlException("mock getAclStatus exception")).
when(mockedDir).
getAclStatus(anyString());
Mockito.doThrow(new AccessControlException("mock removeAcl exception")).
when(mockedDir).
removeAcl(anyString());
Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")).
when(mockedDir).
removeDefaultAcl(anyString());
Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")).
when(mockedDir).
removeAclEntries(anyString(), anyListOf(AclEntry.class));
Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")).
when(mockedDir).
modifyAclEntries(anyString(), anyListOf(AclEntry.class));
// Replace the FSD with the mock FSD.
cluster.getNamesystem().setFSDirectory(mockedDir); cluster.getNamesystem().setFSDirectory(mockedDir);
assertTrue(DummyAuditLogger.initialized); assertTrue(DummyAuditLogger.initialized);
DummyAuditLogger.resetLogCount(); DummyAuditLogger.resetLogCount();
@ -239,39 +223,28 @@ public void testAuditLogWithAclFailure() throws Exception {
try { try {
fs.getAclStatus(p); fs.getAclStatus(p);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock getAclStatus exception", e);
}
try { try {
fs.setAcl(p, acls); fs.setAcl(p, acls);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock setAcl exception", e);
}
try { try {
fs.removeAcl(p); fs.removeAcl(p);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock removeAcl exception", e);
}
try { try {
fs.removeDefaultAcl(p); fs.removeDefaultAcl(p);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock removeDefaultAcl exception", e);
}
try { try {
fs.removeAclEntries(p, acls); fs.removeAclEntries(p, acls);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock removeAclEntries exception", e);
}
try { try {
fs.modifyAclEntries(p, acls); fs.modifyAclEntries(p, acls);
} catch (AccessControlException e) { } catch (AccessControlException ignored) {}
assertExceptionContains("mock modifyAclEntries exception", e);
}
assertEquals(6, DummyAuditLogger.logCount); assertEquals(6, DummyAuditLogger.logCount);
assertEquals(6, DummyAuditLogger.unsuccessfulCount); assertEquals(6, DummyAuditLogger.unsuccessfulCount);
} finally { } finally {