HDFS-15165. In Du missed calling getAttributesProvider. Contributed by Bharat Viswanadham.

This commit is contained in:
Inigo Goiri 2020-02-19 11:33:58 -08:00
parent 3f1aad05f0
commit ec7507162c
2 changed files with 45 additions and 8 deletions

View File

@ -206,15 +206,17 @@ public class FSPermissionChecker implements AccessControlEnforcer {
*/ */
void checkPermission(INode inode, int snapshotId, FsAction access) void checkPermission(INode inode, int snapshotId, FsAction access)
throws AccessControlException { throws AccessControlException {
byte[][] pathComponents = inode.getPathComponents();
INodeAttributes nodeAttributes = getINodeAttrs(pathComponents,
pathComponents.length - 1, inode, snapshotId);
try { try {
byte[][] localComponents = {inode.getLocalNameBytes()}; INodeAttributes[] iNodeAttr = {nodeAttributes};
INodeAttributes[] iNodeAttr = {inode.getSnapshotINode(snapshotId)};
AccessControlEnforcer enforcer = getAccessControlEnforcer(); AccessControlEnforcer enforcer = getAccessControlEnforcer();
enforcer.checkPermission( enforcer.checkPermission(
fsOwner, supergroup, callerUgi, fsOwner, supergroup, callerUgi,
iNodeAttr, // single inode attr in the array iNodeAttr, // single inode attr in the array
new INode[]{inode}, // single inode in the array new INode[]{inode}, // single inode in the array
localComponents, snapshotId, pathComponents, snapshotId,
null, -1, // this will skip checkTraverse() because null, -1, // this will skip checkTraverse() because
// not checking ancestor here // not checking ancestor here
false, null, null, false, null, null,
@ -223,7 +225,8 @@ public class FSPermissionChecker implements AccessControlEnforcer {
false); false);
} catch (AccessControlException ace) { } catch (AccessControlException ace) {
throw new AccessControlException( throw new AccessControlException(
toAccessControlString(inode, inode.getFullPathName(), access)); toAccessControlString(nodeAttributes, inode.getFullPathName(),
access));
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -53,6 +54,7 @@ public class TestINodeAttributeProvider {
private static final Set<String> CALLED = new HashSet<String>(); private static final Set<String> CALLED = new HashSet<String>();
private static final short HDFS_PERMISSION = 0777; private static final short HDFS_PERMISSION = 0777;
private static final short PROVIDER_PERMISSION = 0770; private static final short PROVIDER_PERMISSION = 0770;
private static boolean runPermissionCheck = false;
public static class MyAuthorizationProvider extends INodeAttributeProvider { public static class MyAuthorizationProvider extends INodeAttributeProvider {
@ -70,9 +72,9 @@ public class TestINodeAttributeProvider {
int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess, int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
FsAction parentAccess, FsAction access, FsAction subAccess, FsAction parentAccess, FsAction access, FsAction subAccess,
boolean ignoreEmptyDir) throws AccessControlException { boolean ignoreEmptyDir) throws AccessControlException {
if (ancestorIndex > 1 if ((ancestorIndex > 1
&& inodes[1].getLocalName().equals("user") && inodes[1].getLocalName().equals("user")
&& inodes[2].getLocalName().equals("acl")) { && inodes[2].getLocalName().equals("acl")) || runPermissionCheck) {
this.ace.checkPermission(fsOwner, supergroup, ugi, inodeAttrs, inodes, this.ace.checkPermission(fsOwner, supergroup, ugi, inodeAttrs, inodes,
pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner, pathByNameArr, snapshotId, path, ancestorIndex, doCheckOwner,
ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir); ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
@ -188,8 +190,7 @@ public class TestINodeAttributeProvider {
} }
private boolean useDefault(String[] pathElements) { private boolean useDefault(String[] pathElements) {
return (pathElements.length < 2) || return !Arrays.stream(pathElements).anyMatch("authz"::equals);
!(pathElements[0].equals("user") && pathElements[1].equals("authz"));
} }
private boolean useNullAclFeature(String[] pathElements) { private boolean useNullAclFeature(String[] pathElements) {
@ -220,6 +221,7 @@ public class TestINodeAttributeProvider {
miniDFS.shutdown(); miniDFS.shutdown();
miniDFS = null; miniDFS = null;
} }
runPermissionCheck = false;
Assert.assertTrue(CALLED.contains("stop")); Assert.assertTrue(CALLED.contains("stop"));
} }
@ -438,4 +440,36 @@ public class TestINodeAttributeProvider {
} }
}); });
} }
@Test
// HDFS-15165 - ContentSummary calls should use the provider permissions(if
// attribute provider is configured) and not the underlying HDFS permissions.
public void testContentSummary() throws Exception {
runPermissionCheck = true;
FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
final Path userPath = new Path("/user");
final Path authz = new Path("/user/authz");
final Path authzChild = new Path("/user/authz/child2");
// Create the path /user/authz/child2 where the HDFS permissions are
// 777, 700, 700.
// The permission provider will give permissions 770 to authz and child2
// with the owner foo, group bar.
fs.mkdirs(userPath);
fs.setPermission(userPath, new FsPermission(0777));
fs.mkdirs(authz);
fs.setPermission(authz, new FsPermission(0700));
fs.mkdirs(authzChild);
fs.setPermission(authzChild, new FsPermission(0700));
UserGroupInformation ugi = UserGroupInformation.createUserForTesting("foo",
new String[]{"g1"});
ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
FileSystem fs = FileSystem.get(miniDFS.getConfiguration(0));
fs.getContentSummary(authz);
return null;
}
});
}
} }