diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index fe44e5b402b..090ad938b88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -90,6 +90,9 @@ Release 2.7.3 - UNRELEASED HDFS-10488. Updated WebHDFS documentation regarding CREATE and MKDIR default permissions. (Wellington Chevreuil via wang) + HDFS-5802. NameNode does not check for inode type before traversing down a + path. (Xiao Chen via Yongjun Zhang) + OPTIMIZATIONS HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java index e6570f544e6..00e651d52aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java @@ -192,6 +192,25 @@ class FSPermissionChecker implements AccessControlEnforcer { ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir); } + /** + * Check whether exception e is due to an ancestor inode's not being + * directory. + */ + private void checkAncestorType(INode[] inodes, int ancestorIndex, + AccessControlException e) throws AccessControlException { + for (int i = 0; i <= ancestorIndex; i++) { + if (inodes[i] == null) { + break; + } + if (!inodes[i].isDirectory()) { + throw new AccessControlException( + e.getMessage() + " (Ancestor " + inodes[i].getFullPathName() + + " is not a directory)."); + } + } + throw e; + } + @Override public void checkPermission(String fsOwner, String supergroup, UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs, @@ -202,7 +221,11 @@ class FSPermissionChecker implements AccessControlEnforcer { throws AccessControlException { for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null; ancestorIndex--); - checkTraverse(inodeAttrs, path, ancestorIndex); + try { + checkTraverse(inodeAttrs, path, ancestorIndex); + } catch (AccessControlException e) { + checkAncestorType(inodes, ancestorIndex, e); + } final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1]; if (parentAccess != null && parentAccess.implies(FsAction.WRITE) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java index 23ce916ae11..80b2eb44122 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPermission.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.security.PrivilegedExceptionAction; @@ -510,8 +511,45 @@ public class TestDFSPermission { } } - /* Check if namenode performs permission checking correctly - * for the given user for operations mkdir, open, setReplication, + @Test + public void testPermissionMessageOnNonDirAncestor() + throws IOException, InterruptedException { + FileSystem rootFs = FileSystem.get(conf); + Path p4 = new Path("/p4"); + rootFs.mkdirs(p4); + rootFs.setOwner(p4, USER1_NAME, GROUP1_NAME); + + final Path fpath = new Path("/p4/file"); + DataOutputStream out = rootFs.create(fpath); + out.writeBytes("dhruba: " + fpath); + out.close(); + rootFs.setOwner(fpath, USER1_NAME, GROUP1_NAME); + assertTrue(rootFs.exists(fpath)); + + fs = USER1.doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(conf); + } + }); + + final Path nfpath = new Path("/p4/file/nonexisting"); + assertFalse(rootFs.exists(nfpath)); + + try { + fs.exists(nfpath); + fail("The exists call should have failed."); + } catch (AccessControlException e) { + assertTrue("Permission denied messages must carry file path", + e.getMessage().contains(fpath.getName())); + assertTrue("Permission denied messages must specify existing_file is not " + + "a directory, when checked on /existing_file/non_existing_name", + e.getMessage().contains("is not a directory")); + } + } + + /* Check if namenode performs permission checking correctly + * for the given user for operations mkdir, open, setReplication, * getFileInfo, isDirectory, exists, getContentLength, list, rename, * and delete */ private void testPermissionCheckingPerUser(UserGroupInformation ugi,