NIFI-1322 - PutHDFS - allow file append resolution

This commit is contained in:
Pierre Villard 2016-11-02 20:38:52 +01:00 committed by Oleg Zhurakousky
parent 50010fb340
commit a7d06412f8
1 changed files with 39 additions and 23 deletions

View File

@ -31,6 +31,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -74,9 +75,15 @@ import java.util.concurrent.TimeUnit;
@SeeAlso(GetHDFS.class) @SeeAlso(GetHDFS.class)
public class PutHDFS extends AbstractHadoopProcessor { public class PutHDFS extends AbstractHadoopProcessor {
public static final String REPLACE_RESOLUTION = "replace"; public static final String REPLACE = "replace";
public static final String IGNORE_RESOLUTION = "ignore"; public static final String IGNORE = "ignore";
public static final String FAIL_RESOLUTION = "fail"; public static final String FAIL = "fail";
public static final String APPEND = "append";
public static final AllowableValue REPLACE_RESOLUTION = new AllowableValue(REPLACE, REPLACE, "Replaces the existing file if any.");
public static final AllowableValue IGNORE_RESOLUTION = new AllowableValue(IGNORE, IGNORE, "Ignores the flow file and routes it to success.");
public static final AllowableValue FAIL_RESOLUTION = new AllowableValue(FAIL, FAIL, "Penalizes the flow file and routes it to failure.");
public static final AllowableValue APPEND_RESOLUTION = new AllowableValue(APPEND, APPEND, "Appends to the existing file if any, creates a new file otherwise.");
public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
public static final int BUFFER_SIZE_DEFAULT = 4096; public static final int BUFFER_SIZE_DEFAULT = 4096;
@ -101,8 +108,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
.name("Conflict Resolution Strategy") .name("Conflict Resolution Strategy")
.description("Indicates what should happen when a file with the same name already exists in the output directory") .description("Indicates what should happen when a file with the same name already exists in the output directory")
.required(true) .required(true)
.defaultValue(FAIL_RESOLUTION) .defaultValue(FAIL_RESOLUTION.getValue())
.allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION) .allowableValues(REPLACE_RESOLUTION, IGNORE_RESOLUTION, FAIL_RESOLUTION, APPEND_RESOLUTION)
.build(); .build();
public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor BLOCK_SIZE = new PropertyDescriptor.Builder()
@ -246,21 +253,23 @@ public class PutHDFS extends AbstractHadoopProcessor {
changeOwner(context, hdfs, configuredRootDirPath); changeOwner(context, hdfs, configuredRootDirPath);
} }
final boolean destinationExists = hdfs.exists(copyFile);
// If destination file already exists, resolve that based on processor configuration // If destination file already exists, resolve that based on processor configuration
if (hdfs.exists(copyFile)) { if (destinationExists) {
switch (conflictResponse) { switch (conflictResponse) {
case REPLACE_RESOLUTION: case REPLACE:
if (hdfs.delete(copyFile, false)) { if (hdfs.delete(copyFile, false)) {
getLogger().info("deleted {} in order to replace with the contents of {}", getLogger().info("deleted {} in order to replace with the contents of {}",
new Object[]{copyFile, flowFile}); new Object[]{copyFile, flowFile});
} }
break; break;
case IGNORE_RESOLUTION: case IGNORE:
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("transferring {} to success because file with same name already exists", getLogger().info("transferring {} to success because file with same name already exists",
new Object[]{flowFile}); new Object[]{flowFile});
return; return;
case FAIL_RESOLUTION: case FAIL:
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
getLogger().warn("penalizing {} and routing to failure because file with same name already exists", getLogger().warn("penalizing {} and routing to failure because file with same name already exists",
@ -280,7 +289,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
OutputStream fos = null; OutputStream fos = null;
Path createdFile = null; Path createdFile = null;
try { try {
if(conflictResponse.equals(APPEND_RESOLUTION.getValue()) && destinationExists) {
fos = hdfs.append(copyFile, bufferSize);
} else {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize); fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
}
if (codec != null) { if (codec != null) {
fos = codec.createOutputStream(fos); fos = codec.createOutputStream(fos);
} }
@ -315,6 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
tempDotCopyFile = tempCopyFile; tempDotCopyFile = tempCopyFile;
if(!conflictResponse.equals(APPEND_RESOLUTION.getValue())
|| (conflictResponse.equals(APPEND_RESOLUTION.getValue()) && !destinationExists)) {
boolean renamed = false; boolean renamed = false;
for (int i = 0; i < 10; i++) { // try to rename multiple times. for (int i = 0; i < 10; i++) { // try to rename multiple times.
if (hdfs.rename(tempCopyFile, copyFile)) { if (hdfs.rename(tempCopyFile, copyFile)) {
@ -330,6 +345,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
} }
changeOwner(context, hdfs, copyFile); changeOwner(context, hdfs, copyFile);
}
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate}); new Object[]{flowFile, copyFile, millis, dataRate});