From 5d6ca13c5cfb661caebb78978f4d58e723f031c6 Mon Sep 17 00:00:00 2001 From: caozhiqiang Date: Mon, 5 Jun 2023 19:12:59 +0800 Subject: [PATCH] HDFS-16983. Fix concat operation doesn't honor dfs.permissions.enabled (#5561). Contributed by caozhiqiang. Reviewed-by: zhangshuyan Reviewed-by: He Xiaoqiao Signed-off-by: Ayush Saxena --- .../hdfs/server/namenode/FSDirConcatOp.java | 2 +- .../hdfs/server/namenode/TestHDFSConcat.java | 95 +++++++++++++++++++ 2 files changed, 96 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java index ea5ac38aa86..bc6b66af446 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java @@ -121,7 +121,7 @@ class FSDirConcatOp { for(String src : srcs) { final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE); // permission check for srcs - if (pc != null) { + if (pc != null && fsd.isPermissionEnabled()) { fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java index 1608a84168d..f2e9ec278ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.LambdaTestUtils; @@ -564,4 +566,97 @@ public class TestHDFSConcat { assertEquals(1, dfs.getContentSummary(new Path(dir)).getFileCount()); } + + /** + * Verifies concat with wrong user when dfs.permissions.enabled is false. + * + * @throws IOException + */ + @Test + public void testConcatPermissionEnabled() throws Exception { + Configuration conf2 = new Configuration(); + conf2.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build(); + try { + cluster2.waitClusterUp(); + DistributedFileSystem dfs2 = cluster2.getFileSystem(); + + String testPathDir = "/dir2"; + Path dir = new Path(testPathDir); + dfs2.mkdirs(dir); + Path trg = new Path(testPathDir, "trg"); + Path src = new Path(testPathDir, "src"); + DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1); + DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1); + + // Check permissions with the wrong user when dfs.permissions.enabled is true. + final UserGroupInformation user = + UserGroupInformation.createUserForTesting("theDoctor", new String[] {"tardis"}); + DistributedFileSystem hdfs1 = + (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2); + LambdaTestUtils.intercept(AccessControlException.class, + "Permission denied: user=theDoctor, access=WRITE", + () -> hdfs1.concat(trg, new Path[] {src})); + + conf2.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + cluster2 = new MiniDFSCluster.Builder(conf2).numDataNodes(REPL_FACTOR).build(); + cluster2.waitClusterUp(); + dfs2 = cluster2.getFileSystem(); + dfs2.mkdirs(dir); + DFSTestUtil.createFile(dfs2, trg, blockSize, REPL_FACTOR, 1); + DFSTestUtil.createFile(dfs2, src, blockSize, REPL_FACTOR, 1); + + // Check permissions with the wrong user when dfs.permissions.enabled is false. + DistributedFileSystem hdfs2 = + (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf2); + hdfs2.concat(trg, new Path[] {src}); + } finally { + if (cluster2 != null) { + cluster2.shutdown(); + } + } + } + + /** + * Test permissions of Concat operation. + */ + @Test + public void testConcatPermissions() throws Exception { + String testPathDir = "/dir"; + Path dir = new Path(testPathDir); + dfs.mkdirs(dir); + dfs.setPermission(dir, new FsPermission((short) 0777)); + + Path dst = new Path(testPathDir, "dst"); + Path src = new Path(testPathDir, "src"); + DFSTestUtil.createFile(dfs, dst, blockSize, REPL_FACTOR, 1); + + // Create a user who is not the owner of the file and try concat operation. + final UserGroupInformation user = + UserGroupInformation.createUserForTesting("theDoctor", new String[] {"group"}); + DistributedFileSystem dfs2 = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(user, conf); + + // Test 1: User is not the owner of the file and has src & dst permission. + DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1); + dfs.setPermission(dst, new FsPermission((short) 0777)); + dfs.setPermission(src, new FsPermission((short) 0777)); + dfs2.concat(dst, new Path[] {src}); + + // Test 2: User is not the owner of the file and has only dst permission. + DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1); + dfs.setPermission(dst, new FsPermission((short) 0777)); + dfs.setPermission(src, new FsPermission((short) 0700)); + LambdaTestUtils.intercept(AccessControlException.class, + "Permission denied: user=theDoctor, access=READ", + () -> dfs2.concat(dst, new Path[] {src})); + + // Test 3: User is not the owner of the file and has only src permission. + DFSTestUtil.createFile(dfs, src, blockSize, REPL_FACTOR, 1); + dfs.setPermission(dst, new FsPermission((short) 0700)); + dfs.setPermission(src, new FsPermission((short) 0777)); + LambdaTestUtils.intercept(AccessControlException.class, + "Permission denied: user=theDoctor, access=WRITE", + () -> dfs2.concat(dst, new Path[] {src})); + } }