diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 69b5b77da5..41ddf597d7 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -145,6 +145,7 @@ public class PutHDFS extends AbstractHadoopProcessor { .description( "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() @@ -152,6 +153,7 @@ public class PutHDFS extends AbstractHadoopProcessor { .description( "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); private static final Set relationships; @@ -259,7 +261,7 @@ public class PutHDFS extends AbstractHadoopProcessor { if (!hdfs.mkdirs(configuredRootDirPath)) { throw new IOException(configuredRootDirPath.toString() + " could not be created"); } - changeOwner(context, hdfs, configuredRootDirPath); + changeOwner(context, hdfs, configuredRootDirPath, flowFile); } final boolean destinationExists = hdfs.exists(copyFile); @@ -352,7 +354,7 @@ public class PutHDFS extends AbstractHadoopProcessor { + " to its final filename"); } - changeOwner(context, hdfs, copyFile); + changeOwner(context, hdfs, copyFile, flowFile); } getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", @@ -386,11 +388,15 @@ public class PutHDFS extends AbstractHadoopProcessor { }); } - protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) { + protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) { try { // Change owner and group of file if configured to do so - String owner = context.getProperty(REMOTE_OWNER).getValue(); - String group = context.getProperty(REMOTE_GROUP).getValue(); + String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue(); + String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue(); + + owner = owner == null || owner.isEmpty() ? null : owner; + group = group == null || group.isEmpty() ? null : group; + if (owner != null || group != null) { hdfs.setOwner(name, owner, group); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 482ba69718..2d3ad791b4 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.processor.ProcessContext; @@ -268,7 +269,7 @@ public class PutHDFSTest { final KerberosProperties testKerberosProperties = kerberosProperties; TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { @Override - protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { + protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) { throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); }