diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 6e71331fee..7ac14b1843 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; @@ -148,7 +149,8 @@ public class PutHDFS extends AbstractHadoopProcessor { public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder() .name("Permissions umask") .description( - "A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode") + "A umask represented as an octal number which determines the permissions of files written to HDFS. " + + "If this property is empty, processor uses fs.permission.umask-mode. If fs.permission.umask-mode is undefined, processor honors FsPermission.DEFAULT_UMASK.") .addValidator(HadoopValidators.UMASK_VALIDATOR) .build(); @@ -208,9 +210,8 @@ public class PutHDFS extends AbstractHadoopProcessor { if (umaskProp.isSet()) { dfsUmask = Short.parseShort(umaskProp.getValue(), 8); } else { - dfsUmask = FsPermission.DEFAULT_UMASK; + dfsUmask = FsPermission.getUMask(config).toShort(); } - FsPermission.setUMask(config, new FsPermission(dfsUmask)); } @@ -223,6 +224,7 @@ public class PutHDFS extends AbstractHadoopProcessor { final FileSystem hdfs = getFileSystem(); final Configuration configuration = getConfiguration(); + final UserGroupInformation ugi = getUserGroupInformation(); if (configuration == null || hdfs == null || ugi == null) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 2334730af4..6f74766ae8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -368,6 +368,59 @@ public class PutHDFSTest { runner.assertNotValid(); } + @Test + public void testPutFilePermissionsWithProcessorConfiguredUmask() throws FileNotFoundException, IOException { + // assert the file permission is the same value as processor's property + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setProperty(PutHDFS.UMASK, "777"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + assertEquals(FsPermission.getFileDefault().applyUMask(new FsPermission("777")), mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()); + } + + @Test + public void testPutFilePermissionsWithXmlConfiguredUmask() throws FileNotFoundException, IOException { + // assert the file permission is the same value as xml + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setProperty(PutHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-perms.xml"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + assertEquals(FsPermission.getFileDefault().applyUMask(new FsPermission("777")), mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()); + } + + @Test + public void testPutFIlePermissionsWithNoConfiguredUmask() throws FileNotFoundException, IOException { + // assert the file permission fallback works. It should read FsPermission.DEFAULT_UMASK + PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) { + Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + assertEquals( + FsPermission.getFileDefault().applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)), + mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission() + ); + } + private class TestablePutHDFS extends PutHDFS { private KerberosProperties testKerberosProperties; @@ -410,7 +463,7 @@ public class PutHDFSTest { @Override public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) { - pathToStatus.put(f, newFile(f)); + pathToStatus.put(f, newFile(f, permission)); return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics("")); } @@ -477,8 +530,8 @@ public class PutHDFSTest { return pathToStatus.containsKey(f); } - private FileStatus newFile(Path p) { - return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0644), "owner", "group", p); + private FileStatus newFile(Path p, FsPermission permission) { + return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p); } private FileStatus newDir(Path p) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-perms.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-perms.xml new file mode 100644 index 0000000000..9ea36254a2 --- /dev/null +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/resources/core-site-perms.xml @@ -0,0 +1,29 @@ + + + + + + + + + fs.defaultFS + file:/// + + + fs.permissions.umask-mode + 777 + +