diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 3a0cb48a6a..cb49d59d90 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; @@ -74,9 +75,15 @@ import java.util.concurrent.TimeUnit; @SeeAlso(GetHDFS.class) public class PutHDFS extends AbstractHadoopProcessor { - public static final String REPLACE_RESOLUTION = "replace"; - public static final String IGNORE_RESOLUTION = "ignore"; - public static final String FAIL_RESOLUTION = "fail"; + public static final String REPLACE = "replace"; + public static final String IGNORE = "ignore"; + public static final String FAIL = "fail"; + public static final String APPEND = "append"; + + public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue(REPLACE, REPLACE, "Replaces the existing file if any."); + public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue(IGNORE, IGNORE, "Ignores the flow file and routes it to success."); + public static final AllowableValue FAIL_RESOLUTION = new AllowableValue(FAIL, FAIL, "Penalizes the flow file and routes it to failure."); + public static final AllowableValue APPEND_RESOLUTION = new AllowableValue(APPEND, APPEND, "Appends to the existing file if any, creates a new file otherwise."); public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final int BUFFER_SIZE_DEFAULT = 4096; @@ -101,8 +108,8 @@ public class PutHDFS extends AbstractHadoopProcessor { .name("Conflict Resolution Strategy") .description("Indicates what should happen when a file with the same name already exists in the output directory") .required(true) - .defaultValue(FAIL_RESOLUTION) - .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION) + .defaultValue(FAIL_RESOLUTION.getValue()) + .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, APPEND_RESOLUTION) .build(); public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() @@ -246,21 +253,23 @@ public class PutHDFS extends AbstractHadoopProcessor { changeOwner(context, hdfs, configuredRootDirPath); } + final boolean destinationExists = hdfs.exists(copyFile); + // If destination file already exists, resolve that based on processor configuration - if (hdfs.exists(copyFile)) { + if (destinationExists) { switch (conflictResponse) { - case REPLACE_RESOLUTION: + case REPLACE: if (hdfs.delete(copyFile, false)) { getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{copyFile, flowFile}); } break; - case IGNORE_RESOLUTION: + case IGNORE: session.transfer(flowFile, REL_SUCCESS); getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile}); return; - case FAIL_RESOLUTION: + case FAIL: flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); getLogger().warn("penalizing {} and routing to failure because file with same name already exists", @@ -280,7 +289,11 @@ public class PutHDFS extends AbstractHadoopProcessor { OutputStream fos = null; Path createdFile = null; try { - fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + if(conflictResponse.equals(APPEND_RESOLUTION.getValue()) && destinationExists) { + fos = hdfs.append(copyFile, bufferSize); + } else { + fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); + } if (codec != null) { fos = codec.createOutputStream(fos); } @@ -315,21 +328,24 @@ public class PutHDFS extends AbstractHadoopProcessor { final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); tempDotCopyFile = tempCopyFile; - boolean renamed = false; - for (int i = 0; i < 10; i++) { // try to rename multiple times. - if (hdfs.rename(tempCopyFile, copyFile)) { - renamed = true; - break;// rename was successful + if(!conflictResponse.equals(APPEND_RESOLUTION.getValue()) + || (conflictResponse.equals(APPEND_RESOLUTION.getValue()) && !destinationExists)) { + boolean renamed = false; + for (int i = 0; i < 10; i++) { // try to rename multiple times. + if (hdfs.rename(tempCopyFile, copyFile)) { + renamed = true; + break;// rename was successful + } + Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve + } + if (!renamed) { + hdfs.delete(tempCopyFile, false); + throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile + + " to its final filename"); } - Thread.sleep(200L);// try waiting to let whatever might cause rename failure to resolve - } - if (!renamed) { - hdfs.delete(tempCopyFile, false); - throw new ProcessException("Copied file to HDFS but could not rename dot file " + tempCopyFile - + " to its final filename"); - } - changeOwner(context, hdfs, copyFile); + changeOwner(context, hdfs, copyFile); + } getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile, copyFile, millis, dataRate});