diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3afcf2bf290..a4f4bc3ac54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -181,6 +181,9 @@ Release 2.7.0 - UNRELEASED HDFS-7459. Consolidate cache-related implementation in FSNamesystem into a single class. (wheat9) + HDFS-7476. Consolidate ACL-related operations to a single class. + (wheat9 via cnauroth) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java index ac30597bd73..a86604686fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclStorage.java @@ -240,39 +240,6 @@ final class AclStorage { return existingAcl; } - /** - * Completely removes the ACL from an inode. - * - * @param inode INode to update - * @param snapshotId int latest snapshot ID of inode - * @throws QuotaExceededException if quota limit is exceeded - */ - public static void removeINodeAcl(INode inode, int snapshotId) - throws QuotaExceededException { - AclFeature f = inode.getAclFeature(); - if (f == null) { - return; - } - - FsPermission perm = inode.getFsPermission(); - List featureEntries = getEntriesFromAclFeature(f); - if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) { - // Restore group permissions from the feature's entry to permission - // bits, overwriting the mask, which is not part of a minimal ACL. - AclEntry groupEntryKey = new AclEntry.Builder() - .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build(); - int groupEntryIndex = Collections.binarySearch(featureEntries, - groupEntryKey, AclTransformation.ACL_ENTRY_COMPARATOR); - assert groupEntryIndex >= 0; - FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission(); - FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm, - perm.getOtherAction(), perm.getStickyBit()); - inode.setPermission(newPerm, snapshotId); - } - - inode.removeAclFeature(snapshotId); - } - /** * Updates an inode with a new ACL. This method takes a full logical ACL and * stores the entries to the inode's {@link FsPermission} and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java new file mode 100644 index 00000000000..ac899aab362 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAclOp.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclEntryType; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.AclException; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +class FSDirAclOp { + static HdfsFileStatus modifyAclEntries( + FSDirectory fsd, final String srcArg, List aclSpec) + throws IOException { + String src = srcArg; + checkAclsConfigFlag(fsd); + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + fsd.writeLock(); + try { + INodesInPath iip = fsd.getINodesInPath4Write( + FSDirectory.normalizePath(src), true); + fsd.checkOwner(pc, iip); + INode inode = FSDirectory.resolveLastINode(src, iip); + int snapshotId = iip.getLatestSnapshotId(); + List existingAcl = AclStorage.readINodeLogicalAcl(inode); + List newAcl = AclTransformation.mergeAclEntries( + existingAcl, aclSpec); + AclStorage.updateINodeAcl(inode, newAcl, snapshotId); + fsd.getEditLog().logSetAcl(src, newAcl); + } finally { + fsd.writeUnlock(); + } + return fsd.getAuditFileInfo(src, false); + } + + static HdfsFileStatus removeAclEntries( + FSDirectory fsd, final String srcArg, List aclSpec) + throws IOException { + String src = srcArg; + checkAclsConfigFlag(fsd); + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + fsd.writeLock(); + try { + INodesInPath iip = fsd.getINodesInPath4Write( + FSDirectory.normalizePath(src), true); + fsd.checkOwner(pc, iip); + INode inode = FSDirectory.resolveLastINode(src, iip); + int snapshotId = iip.getLatestSnapshotId(); + List existingAcl = AclStorage.readINodeLogicalAcl(inode); + List newAcl = AclTransformation.filterAclEntriesByAclSpec( + existingAcl, aclSpec); + AclStorage.updateINodeAcl(inode, newAcl, snapshotId); + fsd.getEditLog().logSetAcl(src, newAcl); + } finally { + fsd.writeUnlock(); + } + return fsd.getAuditFileInfo(src, false); + } + + static HdfsFileStatus removeDefaultAcl(FSDirectory fsd, final String srcArg) + throws IOException { + String src = srcArg; + checkAclsConfigFlag(fsd); + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + fsd.writeLock(); + try { + INodesInPath iip = fsd.getINodesInPath4Write( + FSDirectory.normalizePath(src), true); + fsd.checkOwner(pc, iip); + INode inode = FSDirectory.resolveLastINode(src, iip); + int snapshotId = iip.getLatestSnapshotId(); + List existingAcl = AclStorage.readINodeLogicalAcl(inode); + List newAcl = AclTransformation.filterDefaultAclEntries( + existingAcl); + AclStorage.updateINodeAcl(inode, newAcl, snapshotId); + fsd.getEditLog().logSetAcl(src, newAcl); + } finally { + fsd.writeUnlock(); + } + return fsd.getAuditFileInfo(src, false); + } + + static HdfsFileStatus removeAcl(FSDirectory fsd, final String srcArg) + throws IOException { + String src = srcArg; + checkAclsConfigFlag(fsd); + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + fsd.writeLock(); + try { + INodesInPath iip = fsd.getINodesInPath4Write(src); + fsd.checkOwner(pc, iip); + unprotectedRemoveAcl(fsd, src); + } finally { + fsd.writeUnlock(); + } + fsd.getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST); + return fsd.getAuditFileInfo(src, false); + } + + static HdfsFileStatus setAcl( + FSDirectory fsd, final String srcArg, List aclSpec) + throws IOException { + String src = srcArg; + checkAclsConfigFlag(fsd); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + FSPermissionChecker pc = fsd.getPermissionChecker(); + src = fsd.resolvePath(pc, src, pathComponents); + fsd.writeLock(); + try { + INodesInPath iip = fsd.getINodesInPath4Write(src); + fsd.checkOwner(pc, iip); + List newAcl = unprotectedSetAcl(fsd, src, aclSpec); + fsd.getEditLog().logSetAcl(src, newAcl); + } finally { + fsd.writeUnlock(); + } + return fsd.getAuditFileInfo(src, false); + } + + static AclStatus getAclStatus( + FSDirectory fsd, String src) throws IOException { + checkAclsConfigFlag(fsd); + FSPermissionChecker pc = fsd.getPermissionChecker(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + src = fsd.resolvePath(pc, src, pathComponents); + String srcs = FSDirectory.normalizePath(src); + fsd.readLock(); + try { + // There is no real inode for the path ending in ".snapshot", so return a + // non-null, unpopulated AclStatus. This is similar to getFileInfo. + if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) && + fsd.getINode4DotSnapshot(srcs) != null) { + return new AclStatus.Builder().owner("").group("").build(); + } + INodesInPath iip = fsd.getINodesInPath(srcs, true); + if (fsd.isPermissionEnabled()) { + fsd.checkTraverse(pc, iip); + } + INode inode = FSDirectory.resolveLastINode(srcs, iip); + int snapshotId = iip.getPathSnapshotId(); + List acl = AclStorage.readINodeAcl(inode, snapshotId); + return new AclStatus.Builder() + .owner(inode.getUserName()).group(inode.getGroupName()) + .stickyBit(inode.getFsPermission(snapshotId).getStickyBit()) + .addEntries(acl).build(); + } finally { + fsd.readUnlock(); + } + } + + static List unprotectedSetAcl( + FSDirectory fsd, String src, List aclSpec) + throws IOException { + // ACL removal is logged to edits as OP_SET_ACL with an empty list. + if (aclSpec.isEmpty()) { + unprotectedRemoveAcl(fsd, src); + return AclFeature.EMPTY_ENTRY_LIST; + } + + assert fsd.hasWriteLock(); + INodesInPath iip = fsd.getINodesInPath4Write(FSDirectory.normalizePath + (src), true); + INode inode = FSDirectory.resolveLastINode(src, iip); + int snapshotId = iip.getLatestSnapshotId(); + List existingAcl = AclStorage.readINodeLogicalAcl(inode); + List newAcl = AclTransformation.replaceAclEntries(existingAcl, + aclSpec); + AclStorage.updateINodeAcl(inode, newAcl, snapshotId); + return newAcl; + } + + private static void checkAclsConfigFlag(FSDirectory fsd) throws AclException { + if (!fsd.isAclsEnabled()) { + throw new AclException(String.format( + "The ACL operation has been rejected. " + + "Support for ACLs has been disabled by setting %s to false.", + DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY)); + } + } + + private static void unprotectedRemoveAcl(FSDirectory fsd, String src) + throws IOException { + assert fsd.hasWriteLock(); + INodesInPath iip = fsd.getINodesInPath4Write( + FSDirectory.normalizePath(src), true); + INode inode = FSDirectory.resolveLastINode(src, iip); + int snapshotId = iip.getLatestSnapshotId(); + AclFeature f = inode.getAclFeature(); + if (f == null) { + return; + } + + FsPermission perm = inode.getFsPermission(); + List featureEntries = AclStorage.getEntriesFromAclFeature(f); + if (featureEntries.get(0).getScope() == AclEntryScope.ACCESS) { + // Restore group permissions from the feature's entry to permission + // bits, overwriting the mask, which is not part of a minimal ACL. + AclEntry groupEntryKey = new AclEntry.Builder() + .setScope(AclEntryScope.ACCESS).setType(AclEntryType.GROUP).build(); + int groupEntryIndex = Collections.binarySearch( + featureEntries, groupEntryKey, + AclTransformation.ACL_ENTRY_COMPARATOR); + assert groupEntryIndex >= 0; + FsAction groupPerm = featureEntries.get(groupEntryIndex).getPermission(); + FsPermission newPerm = new FsPermission(perm.getUserAction(), groupPerm, + perm.getOtherAction(), perm.getStickyBit()); + inode.setPermission(newPerm, snapshotId); + } + + inode.removeAclFeature(snapshotId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 444589e3664..82741ce0aa9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -48,7 +48,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; @@ -143,6 +142,12 @@ public class FSDirectory implements Closeable { private final ReentrantReadWriteLock dirLock; private final boolean isPermissionEnabled; + /** + * Support for ACLs is controlled by a configuration flag. If the + * configuration flag is false, then the NameNode will reject all + * ACL-related operations. + */ + private final boolean aclsEnabled; private final String fsOwnerShortUserName; private final String supergroup; private final INodeId inodeId; @@ -204,7 +209,10 @@ public class FSDirectory implements Closeable { this.supergroup = conf.get( DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); - + this.aclsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, + DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT); + LOG.info("ACLs enabled? " + aclsEnabled); int configuredLimit = conf.getInt( DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); this.lsLimit = configuredLimit>0 ? @@ -263,6 +271,9 @@ public class FSDirectory implements Closeable { boolean isPermissionEnabled() { return isPermissionEnabled; } + boolean isAclsEnabled() { + return aclsEnabled; + } int getLsLimit() { return lsLimit; @@ -1549,140 +1560,6 @@ public class FSDirectory implements Closeable { return addINode(path, symlink) ? symlink : null; } - List modifyAclEntries(String src, List aclSpec) throws IOException { - writeLock(); - try { - return unprotectedModifyAclEntries(src, aclSpec); - } finally { - writeUnlock(); - } - } - - private List unprotectedModifyAclEntries(String src, - List aclSpec) throws IOException { - assert hasWriteLock(); - INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getLatestSnapshotId(); - List existingAcl = AclStorage.readINodeLogicalAcl(inode); - List newAcl = AclTransformation.mergeAclEntries(existingAcl, - aclSpec); - AclStorage.updateINodeAcl(inode, newAcl, snapshotId); - return newAcl; - } - - List removeAclEntries(String src, List aclSpec) throws IOException { - writeLock(); - try { - return unprotectedRemoveAclEntries(src, aclSpec); - } finally { - writeUnlock(); - } - } - - private List unprotectedRemoveAclEntries(String src, - List aclSpec) throws IOException { - assert hasWriteLock(); - INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getLatestSnapshotId(); - List existingAcl = AclStorage.readINodeLogicalAcl(inode); - List newAcl = AclTransformation.filterAclEntriesByAclSpec( - existingAcl, aclSpec); - AclStorage.updateINodeAcl(inode, newAcl, snapshotId); - return newAcl; - } - - List removeDefaultAcl(String src) throws IOException { - writeLock(); - try { - return unprotectedRemoveDefaultAcl(src); - } finally { - writeUnlock(); - } - } - - private List unprotectedRemoveDefaultAcl(String src) - throws IOException { - assert hasWriteLock(); - INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getLatestSnapshotId(); - List existingAcl = AclStorage.readINodeLogicalAcl(inode); - List newAcl = AclTransformation.filterDefaultAclEntries( - existingAcl); - AclStorage.updateINodeAcl(inode, newAcl, snapshotId); - return newAcl; - } - - void removeAcl(String src) throws IOException { - writeLock(); - try { - unprotectedRemoveAcl(src); - } finally { - writeUnlock(); - } - } - - private void unprotectedRemoveAcl(String src) throws IOException { - assert hasWriteLock(); - INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getLatestSnapshotId(); - AclStorage.removeINodeAcl(inode, snapshotId); - } - - List setAcl(String src, List aclSpec) throws IOException { - writeLock(); - try { - return unprotectedSetAcl(src, aclSpec); - } finally { - writeUnlock(); - } - } - - List unprotectedSetAcl(String src, List aclSpec) - throws IOException { - // ACL removal is logged to edits as OP_SET_ACL with an empty list. - if (aclSpec.isEmpty()) { - unprotectedRemoveAcl(src); - return AclFeature.EMPTY_ENTRY_LIST; - } - - assert hasWriteLock(); - INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getLatestSnapshotId(); - List existingAcl = AclStorage.readINodeLogicalAcl(inode); - List newAcl = AclTransformation.replaceAclEntries(existingAcl, - aclSpec); - AclStorage.updateINodeAcl(inode, newAcl, snapshotId); - return newAcl; - } - - AclStatus getAclStatus(String src) throws IOException { - String srcs = normalizePath(src); - readLock(); - try { - // There is no real inode for the path ending in ".snapshot", so return a - // non-null, unpopulated AclStatus. This is similar to getFileInfo. - if (srcs.endsWith(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR) && - getINode4DotSnapshot(srcs) != null) { - return new AclStatus.Builder().owner("").group("").build(); - } - INodesInPath iip = getLastINodeInPath(srcs, true); - INode inode = resolveLastINode(src, iip); - int snapshotId = iip.getPathSnapshotId(); - List acl = AclStorage.readINodeAcl(inode, snapshotId); - return new AclStatus.Builder() - .owner(inode.getUserName()).group(inode.getGroupName()) - .stickyBit(inode.getFsPermission(snapshotId).getStickyBit()) - .addEntries(acl).build(); - } finally { - readUnlock(); - } - } - /** * Removes a list of XAttrs from an inode at a path. * @@ -2065,9 +1942,10 @@ public class FSDirectory implements Closeable { return null; } - private static INode resolveLastINode(String src, INodesInPath iip) + static INode resolveLastINode(String src, INodesInPath iip) throws FileNotFoundException { - INode inode = iip.getLastINode(); + INode[] inodes = iip.getINodes(); + INode inode = inodes[inodes.length - 1]; if (inode == null) throw new FileNotFoundException("cannot find " + src); return inode; @@ -2246,8 +2124,8 @@ public class FSDirectory implements Closeable { } /** @return the {@link INodesInPath} containing only the last inode. */ - private INodesInPath getLastINodeInPath(String path, boolean resolveLink - ) throws UnresolvedLinkException { + INodesInPath getLastINodeInPath( + String path, boolean resolveLink) throws UnresolvedLinkException { return INodesInPath.resolve(rootDir, INode.getPathComponents(path), 1, resolveLink); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 07261b360f8..efc5b972640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -827,7 +827,7 @@ public class FSEditLogLoader { } case OP_SET_ACL: { SetAclOp setAclOp = (SetAclOp) op; - fsDir.unprotectedSetAcl(setAclOp.src, setAclOp.aclEntries); + FSDirAclOp.unprotectedSetAcl(fsDir, setAclOp.src, setAclOp.aclEntries); break; } case OP_SET_XATTR: { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f865f4f8576..7cd0c00a8a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -521,7 +521,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, private final RetryCache retryCache; - private final boolean aclsEnabled; private final boolean xattrsEnabled; private final int xattrMaxSize; @@ -842,10 +841,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, auditLoggers.get(0) instanceof DefaultAuditLogger; this.retryCache = ignoreRetryCache ? null : initRetryCache(conf); - this.aclsEnabled = conf.getBoolean( - DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, - DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_DEFAULT); - LOG.info("ACLs enabled? " + aclsEnabled); this.xattrsEnabled = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_DEFAULT); @@ -7731,158 +7726,105 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return results; } - void modifyAclEntries(final String srcArg, List aclSpec) + void modifyAclEntries(final String src, List aclSpec) throws IOException { - String src = srcArg; - checkAclsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + HdfsFileStatus auditStat = null; checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot modify ACL entries on " + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - dir.checkOwner(pc, iip); - List newAcl = dir.modifyAclEntries(src, aclSpec); - getEditLog().logSetAcl(src, newAcl); - resultingStat = getAuditFileInfo(src, false); + auditStat = FSDirAclOp.modifyAclEntries(dir, src, aclSpec); } catch (AccessControlException e) { - logAuditEvent(false, "modifyAclEntries", srcArg); + logAuditEvent(false, "modifyAclEntries", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); - logAuditEvent(true, "modifyAclEntries", srcArg, null, resultingStat); + logAuditEvent(true, "modifyAclEntries", src, null, auditStat); } - void removeAclEntries(final String srcArg, List aclSpec) + void removeAclEntries(final String src, List aclSpec) throws IOException { - String src = srcArg; - checkAclsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + HdfsFileStatus auditStat = null; writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot remove ACL entries on " + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - dir.checkOwner(pc, iip); - List newAcl = dir.removeAclEntries(src, aclSpec); - getEditLog().logSetAcl(src, newAcl); - resultingStat = getAuditFileInfo(src, false); + auditStat = FSDirAclOp.removeAclEntries(dir, src, aclSpec); } catch (AccessControlException e) { - logAuditEvent(false, "removeAclEntries", srcArg); + logAuditEvent(false, "removeAclEntries", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); - logAuditEvent(true, "removeAclEntries", srcArg, null, resultingStat); + logAuditEvent(true, "removeAclEntries", src, null, auditStat); } - void removeDefaultAcl(final String srcArg) throws IOException { - String src = srcArg; - checkAclsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + void removeDefaultAcl(final String src) throws IOException { + HdfsFileStatus auditStat = null; checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot remove default ACL entries on " + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - dir.checkOwner(pc, iip); - List newAcl = dir.removeDefaultAcl(src); - getEditLog().logSetAcl(src, newAcl); - resultingStat = getAuditFileInfo(src, false); + auditStat = FSDirAclOp.removeDefaultAcl(dir, src); } catch (AccessControlException e) { - logAuditEvent(false, "removeDefaultAcl", srcArg); + logAuditEvent(false, "removeDefaultAcl", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); - logAuditEvent(true, "removeDefaultAcl", srcArg, null, resultingStat); + logAuditEvent(true, "removeDefaultAcl", src, null, auditStat); } - void removeAcl(final String srcArg) throws IOException { - String src = srcArg; - checkAclsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + void removeAcl(final String src) throws IOException { + HdfsFileStatus auditStat = null; checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot remove ACL on " + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - dir.checkOwner(pc, iip); - dir.removeAcl(src); - getEditLog().logSetAcl(src, AclFeature.EMPTY_ENTRY_LIST); - resultingStat = getAuditFileInfo(src, false); + auditStat = FSDirAclOp.removeAcl(dir, src); } catch (AccessControlException e) { - logAuditEvent(false, "removeAcl", srcArg); + logAuditEvent(false, "removeAcl", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); - logAuditEvent(true, "removeAcl", srcArg, null, resultingStat); + logAuditEvent(true, "removeAcl", src, null, auditStat); } - void setAcl(final String srcArg, List aclSpec) throws IOException { - String src = srcArg; - checkAclsConfigFlag(); - HdfsFileStatus resultingStat = null; - FSPermissionChecker pc = getPermissionChecker(); + void setAcl(final String src, List aclSpec) throws IOException { + HdfsFileStatus auditStat = null; checkOperation(OperationCategory.WRITE); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); writeLock(); try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot set ACL on " + src); - src = dir.resolvePath(pc, src, pathComponents); - final INodesInPath iip = dir.getINodesInPath4Write(src); - dir.checkOwner(pc, iip); - List newAcl = dir.setAcl(src, aclSpec); - getEditLog().logSetAcl(src, newAcl); - resultingStat = getAuditFileInfo(src, false); + auditStat = FSDirAclOp.setAcl(dir, src, aclSpec); } catch (AccessControlException e) { - logAuditEvent(false, "setAcl", srcArg); + logAuditEvent(false, "setAcl", src); throw e; } finally { writeUnlock(); } getEditLog().logSync(); - logAuditEvent(true, "setAcl", srcArg, null, resultingStat); + logAuditEvent(true, "setAcl", src, null, auditStat); } AclStatus getAclStatus(String src) throws IOException { - checkAclsConfigFlag(); - FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.READ); - byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); boolean success = false; readLock(); try { checkOperation(OperationCategory.READ); - src = dir.resolvePath(pc, src, pathComponents); - INodesInPath iip = dir.getINodesInPath(src, true); - if (isPermissionEnabled) { - dir.checkPermission(pc, iip, false, null, null, null, null); - } - final AclStatus ret = dir.getAclStatus(src); + final AclStatus ret = FSDirAclOp.getAclStatus(dir, src); success = true; return ret; } finally { @@ -8370,15 +8312,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } } - private void checkAclsConfigFlag() throws AclException { - if (!aclsEnabled) { - throw new AclException(String.format( - "The ACL operation has been rejected. " - + "Support for ACLs has been disabled by setting %s to false.", - DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY)); - } - } - private void checkXAttrsConfigFlag() throws IOException { if (!xattrsEnabled) { throw new IOException(String.format( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java index c91cd75bbcb..0c119bf3597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogger.java @@ -18,21 +18,6 @@ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY; -import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; -import static org.junit.Assert.*; -import static org.mockito.Matchers.anyListOf; -import static org.mockito.Matchers.anyString; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; - import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -53,6 +38,22 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.NNTOP_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doThrow; + /** * Tests for the {@link AuditLogger} custom audit logging interface. */ @@ -208,27 +209,10 @@ public class TestAuditLogger { try { cluster.waitClusterUp(); final FSDirectory dir = cluster.getNamesystem().getFSDirectory(); - // Set up mock FSDirectory to test FSN audit logging during failure + final FSDirectory mockedDir = Mockito.spy(dir); - Mockito.doThrow(new AccessControlException("mock setAcl exception")). - when(mockedDir). - setAcl(anyString(), anyListOf(AclEntry.class)); - Mockito.doThrow(new AccessControlException("mock getAclStatus exception")). - when(mockedDir). - getAclStatus(anyString()); - Mockito.doThrow(new AccessControlException("mock removeAcl exception")). - when(mockedDir). - removeAcl(anyString()); - Mockito.doThrow(new AccessControlException("mock removeDefaultAcl exception")). - when(mockedDir). - removeDefaultAcl(anyString()); - Mockito.doThrow(new AccessControlException("mock removeAclEntries exception")). - when(mockedDir). - removeAclEntries(anyString(), anyListOf(AclEntry.class)); - Mockito.doThrow(new AccessControlException("mock modifyAclEntries exception")). - when(mockedDir). - modifyAclEntries(anyString(), anyListOf(AclEntry.class)); - // Replace the FSD with the mock FSD. + AccessControlException ex = new AccessControlException(); + doThrow(ex).when(mockedDir).getPermissionChecker(); cluster.getNamesystem().setFSDirectory(mockedDir); assertTrue(DummyAuditLogger.initialized); DummyAuditLogger.resetLogCount(); @@ -239,39 +223,28 @@ public class TestAuditLogger { try { fs.getAclStatus(p); - } catch (AccessControlException e) { - assertExceptionContains("mock getAclStatus exception", e); - } + } catch (AccessControlException ignored) {} try { fs.setAcl(p, acls); - } catch (AccessControlException e) { - assertExceptionContains("mock setAcl exception", e); - } + } catch (AccessControlException ignored) {} try { fs.removeAcl(p); - } catch (AccessControlException e) { - assertExceptionContains("mock removeAcl exception", e); - } + } catch (AccessControlException ignored) {} try { fs.removeDefaultAcl(p); - } catch (AccessControlException e) { - assertExceptionContains("mock removeDefaultAcl exception", e); - } + } catch (AccessControlException ignored) {} try { fs.removeAclEntries(p, acls); - } catch (AccessControlException e) { - assertExceptionContains("mock removeAclEntries exception", e); - } + } catch (AccessControlException ignored) {} try { fs.modifyAclEntries(p, acls); - } catch (AccessControlException e) { - assertExceptionContains("mock modifyAclEntries exception", e); - } + } catch (AccessControlException ignored) {} + assertEquals(6, DummyAuditLogger.logCount); assertEquals(6, DummyAuditLogger.unsuccessfulCount); } finally {