HDFS-16141. [FGL] Address permission related issues with File / Directory. Contributed by Renukaprasad C. (#3205)

This commit is contained in:
prasad-acit 2021-08-13 11:51:18 -07:00 committed by Konstantin V Shvachko
parent 88484fceb4
commit 892fa48c5a
7 changed files with 76 additions and 11 deletions

View File

@ -70,7 +70,7 @@ class FSDirMkdirOp {
// create multiple inodes.
fsn.checkFsObjectLimit();
iip = createMissingDirs(fsd, iip, permissions);
iip = createMissingDirs(fsd, iip, permissions, false);
}
return fsd.getAuditFileInfo(iip);
} finally {
@ -78,11 +78,14 @@ class FSDirMkdirOp {
}
}
static INodesInPath createMissingDirs(FSDirectory fsd,
INodesInPath iip, PermissionStatus permissions) throws IOException {
static INodesInPath createMissingDirs(FSDirectory fsd, INodesInPath iip,
PermissionStatus permissions, boolean inheritPerms) throws IOException {
PermissionStatus basePerm = inheritPerms ?
iip.getExistingINodes().getLastINode().getPermissionStatus() :
permissions;
// create all missing directories along the path,
// but don't add them to the INodeMap yet
permissions = addImplicitUwx(permissions, permissions); // SHV !!!
permissions = addImplicitUwx(basePerm, permissions);
INode[] missing = createPathDirectories(fsd, iip, permissions);
iip = iip.getExistingINodes();
if (missing.length == 0) {
@ -90,8 +93,15 @@ class FSDirMkdirOp {
}
// switch the locks
fsd.getINodeMap().latchWriteLock(iip, missing);
int counter = 0;
// Add missing inodes to the INodeMap
for (INode dir : missing) {
if (counter++ == missing.length - 1) {
//Last folder in the path, use the user given permission
//For MKDIR - refers to the permission given by the user
//For create - refers to the parent directory permission.
permissions = basePerm;
}
iip = addSingleDirectory(fsd, iip, dir, permissions);
assert iip != null : "iip should not be null";
}
@ -279,13 +289,10 @@ class FSDirMkdirOp {
// create the missing directories along the path
INode[] missing = new INode[numMissing];
final int last = iip.length();
INode parent = existing.getLastINode();
for (int i = existing.length(); i < last; i++) {
byte[] component = iip.getPathComponent(i);
missing[i - existing.length()] =
createDirectoryINode(fsd, existing, component, perm);
missing[i - existing.length()].setParent(parent.asDirectory());
parent = missing[i - existing.length()];
}
return missing;
}

View File

@ -400,7 +400,7 @@ class FSDirWriteFileOp {
fsn.checkFsObjectLimit();
INodeFile newNode = null;
INodesInPath parent = FSDirMkdirOp.createMissingDirs(fsd,
iip.getParentINodesInPath(), permissions);
iip.getParentINodesInPath(), permissions, true);
if (parent != null) {
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,

View File

@ -1787,7 +1787,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override
public boolean hasWriteLock() {
return this.fsLock.isWriteLockedByCurrentThread() ||
fsLock.haswWriteChildLock();
fsLock.hasWriteChildLock();
}
@Override
public boolean hasReadLock() {

View File

@ -149,7 +149,7 @@ class FSNamesystemLock {
return partitionLocks.get().remove(lock);
}
boolean haswWriteChildLock() {
boolean hasWriteChildLock() {
Iterator<INodeMapLock> iter = partitionLocks.get().iterator();
// FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size());
while(iter.hasNext()) {

View File

@ -153,7 +153,8 @@ public class INodeMap {
@Override
protected boolean hasWriteChildLock() {
return this.childLock.isWriteLockedByCurrentThread();
return this.childLock.isWriteLockedByCurrentThread() || namesystem
.getFSLock().hasWriteChildLock();
}
@Override

View File

@ -25,6 +25,8 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@ -152,4 +154,55 @@ public class TestDFSMkdirs {
cluster.shutdown();
}
}
@Test
public void testMkDirsWithRestart() throws IOException {
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
DistributedFileSystem dfs = cluster.getFileSystem();
try {
Path dir1 = new Path("/mkdir-1");
Path file1 = new Path(dir1, "file1");
Path deleteDir = new Path("/deleteDir");
Path deleteFile = new Path(dir1, "deleteFile");
// Create a dir in root dir, should succeed
assertTrue(dfs.mkdir(dir1, FsPermission.getDefault()));
dfs.mkdir(deleteDir, FsPermission.getDefault());
assertTrue(dfs.exists(deleteDir));
dfs.delete(deleteDir, true);
assertTrue(!dfs.exists(deleteDir));
DFSTestUtil.writeFile(dfs, file1, "hello world");
DFSTestUtil.writeFile(dfs, deleteFile, "hello world");
int totalFiles = getFileCount(dfs);
//Before deletion there are 2 files
assertTrue("Incorrect file count", 2 == totalFiles);
dfs.delete(deleteFile, false);
totalFiles = getFileCount(dfs);
//After deletion, left with 1 file
assertTrue("Incorrect file count", 1 == totalFiles);
cluster.restartNameNodes();
dfs = cluster.getFileSystem();
assertTrue(dfs.exists(dir1));
assertTrue(!dfs.exists(deleteDir));
assertTrue(dfs.exists(file1));
totalFiles = getFileCount(dfs);
assertTrue("Incorrect file count", 1 == totalFiles);
} finally {
dfs.close();
cluster.shutdown();
}
}
private int getFileCount(DistributedFileSystem dfs) throws IOException {
RemoteIterator<LocatedFileStatus> fileItr =
dfs.listFiles(new Path("/"), true);
int totalFiles = 0;
while (fileItr.hasNext()) {
fileItr.next();
totalFiles++;
}
return totalFiles;
}
}

View File

@ -1313,6 +1313,10 @@ public class TestFileCreation {
fail();
} catch(FileNotFoundException e) {
FileSystem.LOG.info("Caught Expected FileNotFoundException: ", e);
} catch (AssertionError ae) {
//FSDirWriteFileOp#completeFile throws AssertError if the given
// id/node is not an instance of INodeFile.
FileSystem.LOG.info("Caught Expected AssertionError: ", ae);
}
} finally {
IOUtils.closeStream(dfs);