From d6805abf9bc9d32c138f12fdc33e39da8e1c5fff Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Tue, 21 Sep 2021 16:17:24 -0400 Subject: [PATCH] NIFI-9235 - Log conflicts between umask and ACL in PutHDFS This closes #5409 Signed-off-by: David Handermann --- .../nifi-hdfs-processors/pom.xml | 5 + .../nifi/processors/hadoop/PutHDFS.java | 49 ++++++++- .../nifi/processors/hadoop/PutHDFSTest.java | 103 +++++++++++++++++- 3 files changed, 154 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml index 3d8911d69c..cff199aa29 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/pom.xml @@ -78,6 +78,11 @@ 1.15.0-SNAPSHOT test + + com.github.ben-manes.caffeine + caffeine + 2.9.2 + commons-io commons-io diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 0a32930180..462033da14 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -16,11 +16,15 @@ */ package org.apache.nifi.processors.hadoop; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.base.Throwables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntryScope; +import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; @@ -35,6 +39,8 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -58,7 +64,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -93,6 +101,10 @@ public class PutHDFS extends AbstractHadoopProcessor { protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; protected static final int BUFFER_SIZE_DEFAULT = 4096; + // state + + private Cache aclCache; + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -152,7 +164,8 @@ public class PutHDFS extends AbstractHadoopProcessor { .description( "A umask represented as an octal number which determines the permissions of files written to HDFS. " + "This overrides the Hadoop property \"fs.permissions.umask-mode\". " + - "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used.") + "If this property and \"fs.permissions.umask-mode\" are undefined, the Hadoop default \"022\" will be used. "+ + "If the PutHDFS target folder has a default ACL defined, the umask property is ignored by HDFS.") .addValidator(HadoopValidators.UMASK_VALIDATOR) .build(); @@ -229,6 +242,19 @@ public class PutHDFS extends AbstractHadoopProcessor { FsPermission.setUMask(config, new FsPermission(dfsUmask)); } + @OnScheduled + public void onScheduled(final ProcessContext context) { + aclCache = Caffeine.newBuilder() + .maximumSize(20L) + .expireAfterWrite(Duration.ofHours(1)) + .build(); + } + + @OnStopped + public void onStopped() { + aclCache.invalidateAll(); + } + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile flowFile = session.get(); @@ -254,7 +280,7 @@ public class PutHDFS extends AbstractHadoopProcessor { FlowFile putFlowFile = flowFile; try { final Path dirPath = getNormalizedPath(context, DIRECTORY, putFlowFile); - + checkAclStatus(getAclStatus(dirPath)); final String conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue(); final long blockSize = getBlockSize(context, session, putFlowFile, dirPath); final int bufferSize = getBufferSize(context, session, putFlowFile); @@ -423,6 +449,25 @@ public class PutHDFS extends AbstractHadoopProcessor { return null; } + + private void checkAclStatus(final AclStatus aclStatus) throws IOException { + final boolean isDefaultACL = aclStatus.getEntries().stream().anyMatch( + aclEntry -> AclEntryScope.DEFAULT.equals(aclEntry.getScope())); + final boolean isSetUmask = context.getProperty(UMASK).isSet(); + if (isDefaultACL && isSetUmask) { + throw new IOException("PutHDFS umask setting is ignored by HDFS when HDFS default ACL is set."); + } + } + + private AclStatus getAclStatus(final Path dirPath) { + return aclCache.get(dirPath, fn -> { + try { + return hdfs.getAclStatus(dirPath); + } catch (IOException e) { + throw new UncheckedIOException(String.format("Unable to query ACL for directory [%s]", dirPath), e); + } + }); + } }); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 945e24e6b1..abe288dea9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -22,6 +22,8 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.util.Progressable; @@ -43,6 +45,7 @@ import org.ietf.jgss.GSSException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import javax.security.sasl.SaslException; import java.io.ByteArrayOutputStream; @@ -51,6 +54,8 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -61,6 +66,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class PutHDFSTest { @@ -468,6 +476,89 @@ public class PutHDFSTest { fileSystem.getFileStatus(new Path("target/test-classes/randombytes-1")).getPermission()); } + /** + * Multiple invocations of PutHDFS on the same target directory should query the remote filesystem ACL once, and + * use the cached ACL afterwards. + */ + @Test + public void testPutHDFSAclCache() { + final MockFileSystem fileSystem = Mockito.spy(new MockFileSystem()); + final Path directory = new Path("/withACL"); + assertTrue(fileSystem.mkdirs(directory)); + final String acl = "user::rwx,group::rwx,other::rwx"; + final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx"; + fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true)); + + final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHDFS.DIRECTORY, directory.toString()); + runner.setProperty(PutHDFS.UMASK, "077"); + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "empty"); + runner.enqueue(new byte[16], attributes); + runner.run(3); // fetch data once; hit AclCache twice + verify(fileSystem, times(1)).getAclStatus(any(Path.class)); + } + + /** + * When no default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK} + * should be ok. + */ + @Test + public void testPutFileWithNoDefaultACL() { + final List setUmask = Arrays.asList(false, true); + for (boolean setUmaskIt : setUmask) { + final MockFileSystem fileSystem = new MockFileSystem(); + final Path directory = new Path("/withNoDACL"); + assertTrue(fileSystem.mkdirs(directory)); + final String acl = "user::rwx,group::rwx,other::rwx"; + fileSystem.setAcl(directory, AclEntry.parseAclSpec(acl, true)); + + final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHDFS.DIRECTORY, directory.toString()); + if (setUmaskIt) { + runner.setProperty(PutHDFS.UMASK, "077"); + } + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "empty"); + runner.enqueue(new byte[16], attributes); + runner.run(); + assertEquals(1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size()); + assertEquals(0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size()); + } + } + + /** + * When default ACL is present on the remote directory, usage of {@link PutHDFS#UMASK} + * should trigger failure of the flow file. + */ + @Test + public void testPutFileWithDefaultACL() { + final List setUmask = Arrays.asList(false, true); + for (boolean setUmaskIt : setUmask) { + final MockFileSystem fileSystem = new MockFileSystem(); + final Path directory = new Path("/withACL"); + assertTrue(fileSystem.mkdirs(directory)); + final String acl = "user::rwx,group::rwx,other::rwx"; + final String aclDefault = "default:user::rwx,default:group::rwx,default:other::rwx"; + fileSystem.setAcl(directory, AclEntry.parseAclSpec(String.join(",", acl, aclDefault), true)); + + final PutHDFS processor = new TestablePutHDFS(kerberosProperties, fileSystem); + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHDFS.DIRECTORY, directory.toString()); + if (setUmaskIt) { + runner.setProperty(PutHDFS.UMASK, "077"); + } + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.FILENAME.key(), "empty"); + runner.enqueue(new byte[16], attributes); + runner.run(); + assertEquals(setUmaskIt ? 0 : 1, runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS).size()); + assertEquals(setUmaskIt ? 1 : 0, runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE).size()); + } + } + @Test public void testPutFileWithCloseException() throws IOException { mockFileSystem = new MockFileSystem(true); @@ -522,8 +613,9 @@ public class PutHDFSTest { } } - private class MockFileSystem extends FileSystem { + private static class MockFileSystem extends FileSystem { private final Map pathToStatus = new HashMap<>(); + private final Map> pathToAcl = new HashMap<>(); private final boolean failOnClose; public MockFileSystem() { @@ -534,6 +626,15 @@ public class PutHDFSTest { this.failOnClose = failOnClose; } + public void setAcl(final Path path, final List aclSpec) { + pathToAcl.put(path, aclSpec); + } + + @Override + public AclStatus getAclStatus(final Path path) { + return new AclStatus.Builder().addEntries(pathToAcl.getOrDefault(path, new ArrayList<>())).build(); + } + @Override public URI getUri() { return URI.create("file:///");