mirror of https://github.com/apache/nifi.git
NIFI-9235 - Log conflicts between umask and ACL in PutHDFS
This closes #5409 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
2f36159a21
commit
d6805abf9b
|
@ -78,6 +78,11 @@
|
|||
<version>1.15.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>2.9.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
|
|
|
@ -16,11 +16,15 @@
|
|||
*/
|
||||
package org.apache.nifi.processors.hadoop;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntryScope;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsCreateModes;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
|
@ -35,6 +39,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
|
|||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
|
@ -58,7 +64,9 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
|
@ -93,6 +101,10 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
|
||||
protected static final int BUFFER_SIZE_DEFAULT = 4096;
|
||||
|
||||
// state
|
||||
|
||||
private Cache<Path, AclStatus> aclCache;
|
||||
|
||||
// relationships
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
|
@ -152,7 +164,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
.description(
|
||||
"A umask represented as an octal number which determines the permissions of files written to HDFS. " +
|
||||
"This overrides the Hadoop property \"fs.permissions.umask-mode\". " +
|
||||
"If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.")
|
||||
"If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used. "+
|
||||
"If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.")
|
||||
.addValidator(HadoopValidators.UMASK_VALIDATOR)
|
||||
.build();
|
||||
|
||||
|
@ -229,6 +242,19 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
FsPermission.setUMask(config, new FsPermission(dfsUmask));
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
aclCache = Caffeine.newBuilder()
|
||||
.maximumSize(20L)
|
||||
.expireAfterWrite(Duration.ofHours(1))
|
||||
.build();
|
||||
}
|
||||
|
||||
@OnStopped
|
||||
public void onStopped() {
|
||||
aclCache.invalidateAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final FlowFile flowFile = session.get();
|
||||
|
@ -254,7 +280,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
FlowFile putFlowFile = flowFile;
|
||||
try {
|
||||
final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile);
|
||||
|
||||
checkAclStatus(getAclStatus(dirPath));
|
||||
final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
|
||||
final long blockSize = getBlockSize(context, session, putFlowFile, dirPath);
|
||||
final int bufferSize = getBufferSize(context, session, putFlowFile);
|
||||
|
@ -423,6 +449,25 @@ public class PutHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void checkAclStatus(final AclStatus aclStatus) throws IOException {
|
||||
final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch(
|
||||
aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope()));
|
||||
final boolean isSetUmask = context.getProperty(UMASK).isSet();
|
||||
if (isDefaultACL && isSetUmask) {
|
||||
throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set.");
|
||||
}
|
||||
}
|
||||
|
||||
private AclStatus getAclStatus(final Path dirPath) {
|
||||
return aclCache.get(dirPath, fn -> {
|
||||
try {
|
||||
return hdfs.getAclStatus(dirPath);
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclStatus;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
@ -43,6 +45,7 @@ import org.ietf.jgss.GSSException;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.security.sasl.SaslException;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -51,6 +54,8 @@ import java.io.FileInputStream;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -61,6 +66,9 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
public class PutHDFSTest {
|
||||
|
||||
|
@ -468,6 +476,89 @@ public class PutHDFSTest {
|
|||
fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission());
|
||||
}
|
||||
|
||||
/**
|
||||
* Multiple invocations of PutHDFS on the same target directory should query the remote filesystem ACL once, and
|
||||
* use the cached ACL afterwards.
|
||||
*/
|
||||
@Test
|
||||
public void testPutHDFSAclCache() {
|
||||
final MockFileSystem fileSystem = Mockito.spy(new MockFileSystem());
|
||||
final Path directory = new Path("/withACL");
|
||||
assertTrue(fileSystem.mkdirs(directory));
|
||||
final String acl = "user::rwx,group::rwx,other::rwx";
|
||||
final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
|
||||
fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
|
||||
|
||||
final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
|
||||
runner.setProperty(PutHDFS.UMASK, "077");
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.FILENAME.key(), "empty");
|
||||
runner.enqueue(new byte[16], attributes);
|
||||
runner.run(3); // fetch data once; hit AclCache twice
|
||||
verify(fileSystem, times(1)).getAclStatus(any(Path.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* When no default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
|
||||
* should be ok.
|
||||
*/
|
||||
@Test
|
||||
public void testPutFileWithNoDefaultACL() {
|
||||
final List<Boolean> setUmask = Arrays.asList(false, true);
|
||||
for (boolean setUmaskIt : setUmask) {
|
||||
final MockFileSystem fileSystem = new MockFileSystem();
|
||||
final Path directory = new Path("/withNoDACL");
|
||||
assertTrue(fileSystem.mkdirs(directory));
|
||||
final String acl = "user::rwx,group::rwx,other::rwx";
|
||||
fileSystem.setAcl(directory, AclEntry.parseAclSpec(acl, true));
|
||||
|
||||
final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
|
||||
if (setUmaskIt) {
|
||||
runner.setProperty(PutHDFS.UMASK, "077");
|
||||
}
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.FILENAME.key(), "empty");
|
||||
runner.enqueue(new byte[16], attributes);
|
||||
runner.run();
|
||||
assertEquals(1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
|
||||
assertEquals(0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* When default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK}
|
||||
* should trigger failure of the flow file.
|
||||
*/
|
||||
@Test
|
||||
public void testPutFileWithDefaultACL() {
|
||||
final List<Boolean> setUmask = Arrays.asList(false, true);
|
||||
for (boolean setUmaskIt : setUmask) {
|
||||
final MockFileSystem fileSystem = new MockFileSystem();
|
||||
final Path directory = new Path("/withACL");
|
||||
assertTrue(fileSystem.mkdirs(directory));
|
||||
final String acl = "user::rwx,group::rwx,other::rwx";
|
||||
final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx";
|
||||
fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true));
|
||||
|
||||
final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem);
|
||||
final TestRunner runner = TestRunners.newTestRunner(processor);
|
||||
runner.setProperty(PutHDFS.DIRECTORY, directory.toString());
|
||||
if (setUmaskIt) {
|
||||
runner.setProperty(PutHDFS.UMASK, "077");
|
||||
}
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put(CoreAttributes.FILENAME.key(), "empty");
|
||||
runner.enqueue(new byte[16], attributes);
|
||||
runner.run();
|
||||
assertEquals(setUmaskIt ? 0 : 1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size());
|
||||
assertEquals(setUmaskIt ? 1 : 0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutFileWithCloseException() throws IOException {
|
||||
mockFileSystem = new MockFileSystem(true);
|
||||
|
@ -522,8 +613,9 @@ public class PutHDFSTest {
|
|||
}
|
||||
}
|
||||
|
||||
private class MockFileSystem extends FileSystem {
|
||||
private static class MockFileSystem extends FileSystem {
|
||||
private final Map<Path, FileStatus> pathToStatus = new HashMap<>();
|
||||
private final Map<Path, List<AclEntry>> pathToAcl = new HashMap<>();
|
||||
private final boolean failOnClose;
|
||||
|
||||
public MockFileSystem() {
|
||||
|
@ -534,6 +626,15 @@ public class PutHDFSTest {
|
|||
this.failOnClose = failOnClose;
|
||||
}
|
||||
|
||||
public void setAcl(final Path path, final List<AclEntry> aclSpec) {
|
||||
pathToAcl.put(path, aclSpec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AclStatus getAclStatus(final Path path) {
|
||||
return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return URI.create("file:///");
|
||||
|
|
Loading…
Reference in New Issue