mirror of https://github.com/apache/nifi.git
NIFI-5575 Make PutHDFS check fs.permissions.umask-mode if property "Permission umask" is empty.
This commit is contained in:
parent
2938454ae4
commit
7a763d3c49
|
@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -148,7 +149,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
||||||
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor UMASK = new PropertyDescriptor.Builder()
|
||||||
.name("Permissions umask")
|
.name("Permissions umask")
|
||||||
.description(
|
.description(
|
||||||
"A umask represented as an octal number which determines the permissions of files written to HDFS. This overrides the Hadoop Configuration dfs.umaskmode")
|
"A umask represented as an octal number which determines the permissions of files written to HDFS. "
|
||||||
|
+ "If this property is empty, processor uses fs.permission.umask-mode. If fs.permission.umask-mode is undefined, processor honors FsPermission.DEFAULT_UMASK.")
|
||||||
.addValidator(HadoopValidators.UMASK_VALIDATOR)
|
.addValidator(HadoopValidators.UMASK_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
@ -208,9 +210,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
||||||
if (umaskProp.isSet()) {
|
if (umaskProp.isSet()) {
|
||||||
dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
|
dfsUmask = Short.parseShort(umaskProp.getValue(), 8);
|
||||||
} else {
|
} else {
|
||||||
dfsUmask = FsPermission.DEFAULT_UMASK;
|
dfsUmask = FsPermission.getUMask(config).toShort();
|
||||||
}
|
}
|
||||||
|
|
||||||
FsPermission.setUMask(config, new FsPermission(dfsUmask));
|
FsPermission.setUMask(config, new FsPermission(dfsUmask));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -223,6 +224,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
final FileSystem hdfs = getFileSystem();
|
final FileSystem hdfs = getFileSystem();
|
||||||
final Configuration configuration = getConfiguration();
|
final Configuration configuration = getConfiguration();
|
||||||
|
|
||||||
final UserGroupInformation ugi = getUserGroupInformation();
|
final UserGroupInformation ugi = getUserGroupInformation();
|
||||||
|
|
||||||
if (configuration == null || hdfs == null || ugi == null) {
|
if (configuration == null || hdfs == null || ugi == null) {
|
||||||
|
|
|
@ -368,6 +368,59 @@ public class PutHDFSTest {
|
||||||
runner.assertNotValid();
|
runner.assertNotValid();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutFilePermissionsWithProcessorConfiguredUmask() throws FileNotFoundException, IOException {
|
||||||
|
// assert the file permission is the same value as processor's property
|
||||||
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
|
runner.setProperty(PutHDFS.UMASK, "777");
|
||||||
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
|
runner.enqueue(fis, attributes);
|
||||||
|
runner.run();
|
||||||
|
}
|
||||||
|
assertEquals(FsPermission.getFileDefault().applyUMask(new FsPermission("777")), mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutFilePermissionsWithXmlConfiguredUmask() throws FileNotFoundException, IOException {
|
||||||
|
// assert the file permission is the same value as xml
|
||||||
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
|
runner.setProperty(PutHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site-perms.xml");
|
||||||
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
|
runner.enqueue(fis, attributes);
|
||||||
|
runner.run();
|
||||||
|
}
|
||||||
|
assertEquals(FsPermission.getFileDefault().applyUMask(new FsPermission("777")), mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutFIlePermissionsWithNoConfiguredUmask() throws FileNotFoundException, IOException {
|
||||||
|
// assert the file permission fallback works. It should read FsPermission.DEFAULT_UMASK
|
||||||
|
PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
|
||||||
|
runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
|
||||||
|
try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1")) {
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
|
||||||
|
runner.enqueue(fis, attributes);
|
||||||
|
runner.run();
|
||||||
|
}
|
||||||
|
assertEquals(
|
||||||
|
FsPermission.getFileDefault().applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)),
|
||||||
|
mockFileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private class TestablePutHDFS extends PutHDFS {
|
private class TestablePutHDFS extends PutHDFS {
|
||||||
|
|
||||||
private KerberosProperties testKerberosProperties;
|
private KerberosProperties testKerberosProperties;
|
||||||
|
@ -410,7 +463,7 @@ public class PutHDFSTest {
|
||||||
@Override
|
@Override
|
||||||
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
|
public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication,
|
||||||
final long blockSize, final Progressable progress) {
|
final long blockSize, final Progressable progress) {
|
||||||
pathToStatus.put(f, newFile(f));
|
pathToStatus.put(f, newFile(f, permission));
|
||||||
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
|
return new FSDataOutputStream(new ByteArrayOutputStream(), new Statistics(""));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,8 +530,8 @@ public class PutHDFSTest {
|
||||||
return pathToStatus.containsKey(f);
|
return pathToStatus.containsKey(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus newFile(Path p) {
|
private FileStatus newFile(Path p, FsPermission permission) {
|
||||||
return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, perms((short) 0644), "owner", "group", p);
|
return new FileStatus(100L, false, 3, 128 * 1024 * 1024, 1523456000000L, 1523457000000L, permission, "owner", "group", p);
|
||||||
}
|
}
|
||||||
|
|
||||||
private FileStatus newDir(Path p) {
|
private FileStatus newDir(Path p) {
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
|
||||||
|
<!-- Put site-specific property overrides in this file. -->
|
||||||
|
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>fs.defaultFS</name>
|
||||||
|
<value>file:///</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>fs.permissions.umask-mode</name>
|
||||||
|
<value>777</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue