NIFI-4184: Enabled EL on PutHDFS Remote Group and Owner

I needed to put some attributes on REMOTE_GROUP and REMOTE_OWNER, in order to achieve it i put expressionLanguageSupported(true) on the PropertyDescriptor of REMOTE_GROUP and REMOTE_OWNER

This closes #2007.

Signed-off-by: Davide <davidde85@hotmail.it>
Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Davide 2017-07-13 16:03:54 +02:00 committed by Koji Kawamura
parent b0be99036d
commit d334532b16
2 changed files with 13 additions and 6 deletions

View File

@ -145,6 +145,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
.description( .description(
"Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner") "Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder() public static final PropertyDescriptor REMOTE_GROUP = new PropertyDescriptor.Builder()
@ -152,6 +153,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
.description( .description(
"Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group") "Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build(); .build();
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
@ -259,7 +261,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
if (!hdfs.mkdirs(configuredRootDirPath)) { if (!hdfs.mkdirs(configuredRootDirPath)) {
throw new IOException(configuredRootDirPath.toString() + " could not be created"); throw new IOException(configuredRootDirPath.toString() + " could not be created");
} }
changeOwner(context, hdfs, configuredRootDirPath); changeOwner(context, hdfs, configuredRootDirPath, flowFile);
} }
final boolean destinationExists = hdfs.exists(copyFile); final boolean destinationExists = hdfs.exists(copyFile);
@ -352,7 +354,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
+ " to its final filename"); + " to its final filename");
} }
changeOwner(context, hdfs, copyFile); changeOwner(context, hdfs, copyFile, flowFile);
} }
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
@ -386,11 +388,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
}); });
} }
protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name) { protected void changeOwner(final ProcessContext context, final FileSystem hdfs, final Path name, final FlowFile flowFile) {
try { try {
// Change owner and group of file if configured to do so // Change owner and group of file if configured to do so
String owner = context.getProperty(REMOTE_OWNER).getValue(); String owner = context.getProperty(REMOTE_OWNER).evaluateAttributeExpressions(flowFile).getValue();
String group = context.getProperty(REMOTE_GROUP).getValue(); String group = context.getProperty(REMOTE_GROUP).evaluateAttributeExpressions(flowFile).getValue();
owner = owner == null || owner.isEmpty() ? null : owner;
group = group == null || group.isEmpty() ? null : group;
if (owner != null || group != null) { if (owner != null || group != null) {
hdfs.setOwner(name, owner, group); hdfs.setOwner(name, owner, group);
} }

View File

@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -268,7 +269,7 @@ public class PutHDFSTest {
final KerberosProperties testKerberosProperties = kerberosProperties; final KerberosProperties testKerberosProperties = kerberosProperties;
TestRunner runner = TestRunners.newTestRunner(new PutHDFS() { TestRunner runner = TestRunners.newTestRunner(new PutHDFS() {
@Override @Override
protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) { protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name, FlowFile flowFile) {
throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling"); throw new ProcessException("Forcing Exception to get thrown in order to verify proper handling");
} }