diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 51f6a8241c..116caba8e9 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -17,8 +17,10 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; @@ -61,6 +63,7 @@ import java.io.OutputStream; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -170,6 +173,17 @@ public class PutHDFS extends AbstractHadoopProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); + public static final PropertyDescriptor IGNORE_LOCALITY = new PropertyDescriptor.Builder() + .name("Ignore Locality") + .displayName("Ignore Locality") + .description( + "Directs the HDFS system to ignore locality rules so that data is distributed randomly throughout the cluster") + .required(false) + .defaultValue("false") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + private static final Set relationships; static { @@ -199,6 +213,7 @@ public class PutHDFS extends AbstractHadoopProcessor { props.add(REMOTE_OWNER); props.add(REMOTE_GROUP); props.add(COMPRESSION_CODEC); + props.add(IGNORE_LOCALITY); return props; } @@ -313,8 +328,18 @@ public class PutHDFS extends AbstractHadoopProcessor { if (conflictResponse.equals(APPEND_RESOLUTION_AV.getValue()) && destinationExists) { fos = hdfs.append(copyFile, bufferSize); } else { - fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + final EnumSet cflags = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + + final Boolean ignoreLocality = context.getProperty(IGNORE_LOCALITY).asBoolean(); + if (ignoreLocality) { + cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY); + } + + fos = hdfs.create(tempCopyFile, FsCreateModes.applyUMask(FsPermission.getFileDefault(), + FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, blockSize, + null, null); } + if (codec != null) { fos = codec.createOutputStream(fos); }