diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 2cec8665c5..378dd700d4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -252,6 +252,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { getConfigurationFromResources(config, configResources); + // give sub-classes a chance to process configuration + preProcessConfiguration(config, context); + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout checkHdfsUriForTimeout(config); @@ -287,6 +290,17 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { return new HdfsResources(config, fs, ugi); } + /** + * This method will be called after the Configuration has been created, but before the FileSystem is created, + * allowing sub-classes to take further action on the Configuration before creating the FileSystem. + * + * @param config the Configuration that will be used to create the FileSystem + * @param context the context that can be used to retrieve additional values + */ + protected void preProcessConfiguration(final Configuration config, final ProcessContext context) { + + } + /** * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received * diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java index 70a3697c27..e08b4fb367 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java @@ -210,14 +210,8 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { return putHdfsRecordProperties; } - - @OnScheduled - public final void onScheduled(final ProcessContext context) throws IOException { - super.abstractOnScheduled(context); - - this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); - this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); - + @Override + protected void preProcessConfiguration(Configuration config, ProcessContext context) { // Set umask once, to avoid thread safety issues doing it in onTrigger final PropertyValue umaskProp = context.getProperty(UMASK); final short dfsUmask; @@ -226,8 +220,16 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Configuration conf = getConfiguration(); - FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + + FsPermission.setUMask(config, new FsPermission(dfsUmask)); + } + + @OnScheduled + public final void onScheduled(final ProcessContext context) throws IOException { + super.abstractOnScheduled(context); + + this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue(); + this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue(); } /** diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 41ddf597d7..4a9b2c1dba 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -32,7 +32,6 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -188,10 +187,8 @@ public class PutHDFS extends AbstractHadoopProcessor { return props; } - @OnScheduled - public void onScheduled(ProcessContext context) throws Exception { - super.abstractOnScheduled(context); - + @Override + protected void preProcessConfiguration(final Configuration config, final ProcessContext context) { // Set umask once, to avoid thread safety issues doing it in onTrigger final PropertyValue umaskProp = context.getProperty(UMASK); final short dfsUmask; @@ -200,8 +197,8 @@ public class PutHDFS extends AbstractHadoopProcessor { } else { dfsUmask = FsPermission.DEFAULT_UMASK; } - final Configuration conf = getConfiguration(); - FsPermission.setUMask(conf, new FsPermission(dfsUmask)); + + FsPermission.setUMask(config, new FsPermission(dfsUmask)); } @Override