HDFS-5802. NameNode does not check for inode type before traversing down a path. (Xiao Chen via Yongjun Zhang)

(cherry picked from commit 9859fd006b)
This commit is contained in:
Yongjun Zhang 2015-09-17 22:56:14 -07:00 committed by Zhe Zhang
parent e807fde448
commit 97e4b13038
3 changed files with 67 additions and 3 deletions

View File

@ -90,6 +90,9 @@ Release 2.7.3 - UNRELEASED
HDFS-10488. Updated WebHDFS documentation regarding CREATE and MKDIR default
permissions. (Wellington Chevreuil via wang)
HDFS-5802. NameNode does not check for inode type before traversing down a
path. (Xiao Chen via Yongjun Zhang)
OPTIMIZATIONS
HDFS-8845. DiskChecker should not traverse the entire tree (Chang Li via

View File

@ -192,6 +192,25 @@ class FSPermissionChecker implements AccessControlEnforcer {
ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
}
/**
* Check whether exception e is due to an ancestor inode's not being
* directory.
*/
private void checkAncestorType(INode[] inodes, int ancestorIndex,
AccessControlException e) throws AccessControlException {
for (int i = 0; i <= ancestorIndex; i++) {
if (inodes[i] == null) {
break;
}
if (!inodes[i].isDirectory()) {
throw new AccessControlException(
e.getMessage() + " (Ancestor " + inodes[i].getFullPathName()
+ " is not a directory).");
}
}
throw e;
}
@Override
public void checkPermission(String fsOwner, String supergroup,
UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
@ -202,7 +221,11 @@ class FSPermissionChecker implements AccessControlEnforcer {
throws AccessControlException {
for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
ancestorIndex--);
checkTraverse(inodeAttrs, path, ancestorIndex);
try {
checkTraverse(inodeAttrs, path, ancestorIndex);
} catch (AccessControlException e) {
checkAncestorType(inodes, ancestorIndex, e);
}
final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
if (parentAccess != null && parentAccess.implies(FsAction.WRITE)

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@ -510,8 +511,45 @@ public class TestDFSPermission {
}
}
/* Check if namenode performs permission checking correctly
* for the given user for operations mkdir, open, setReplication,
@Test
public void testPermissionMessageOnNonDirAncestor()
throws IOException, InterruptedException {
FileSystem rootFs = FileSystem.get(conf);
Path p4 = new Path("/p4");
rootFs.mkdirs(p4);
rootFs.setOwner(p4, USER1_NAME, GROUP1_NAME);
final Path fpath = new Path("/p4/file");
DataOutputStream out = rootFs.create(fpath);
out.writeBytes("dhruba: " + fpath);
out.close();
rootFs.setOwner(fpath, USER1_NAME, GROUP1_NAME);
assertTrue(rootFs.exists(fpath));
fs = USER1.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return FileSystem.get(conf);
}
});
final Path nfpath = new Path("/p4/file/nonexisting");
assertFalse(rootFs.exists(nfpath));
try {
fs.exists(nfpath);
fail("The exists call should have failed.");
} catch (AccessControlException e) {
assertTrue("Permission denied messages must carry file path",
e.getMessage().contains(fpath.getName()));
assertTrue("Permission denied messages must specify existing_file is not "
+ "a directory, when checked on /existing_file/non_existing_name",
e.getMessage().contains("is not a directory"));
}
}
/* Check if namenode performs permission checking correctly
* for the given user for operations mkdir, open, setReplication,
* getFileInfo, isDirectory, exists, getContentLength, list, rename,
* and delete */
private void testPermissionCheckingPerUser(UserGroupInformation ugi,