From b32c70c419d794a84e2ff602c0b16d511ec3e8be Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 1 Jun 2016 13:16:10 -0400 Subject: [PATCH] NIFI-1929: Improvements for PutHDFS attribute handling This closes #486. --- .../nifi/processors/hadoop/PutHDFS.java | 15 +++++++- .../nifi/processors/hadoop/PutHDFSTest.java | 38 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 7c9747873b..f05c2c7458 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -24,7 +24,9 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.ipc.RemoteException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; @@ -64,7 +66,11 @@ import java.util.concurrent.TimeUnit; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"hadoop", "HDFS", "put", "copy", "filesystem"}) @CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)") -@WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.") +@ReadsAttribute(attribute = "filename", description = "The name of the file written to HDFS comes from the value of this attribute.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file written to HDFS is stored in this attribute."), + @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file on HDFS is stored in this attribute.") +}) @SeeAlso(GetHDFS.class) public class PutHDFS extends AbstractHadoopProcessor { @@ -75,6 +81,8 @@ public class PutHDFS extends AbstractHadoopProcessor { public static final String BUFFER_SIZE_KEY = "io.file.buffer.size"; public static final int BUFFER_SIZE_DEFAULT = 4096; + public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path"; + // relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -329,8 +337,13 @@ public class PutHDFS extends AbstractHadoopProcessor { new Object[]{flowFile, copyFile, millis, dataRate}); final String outputPath = copyFile.toString(); + final String newFilename = copyFile.getName(); + final String hdfsPath = copyFile.getParent().toString(); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename); + flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath); final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; session.getProvenanceReporter().send(flowFile, transitUri); + session.transfer(flowFile, REL_SUCCESS); } catch (final Throwable t) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 76970eddbd..48e0f89972 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -214,7 +214,45 @@ public class PutHDFSTest { .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); assertTrue(failedFlowFiles.isEmpty()); + List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); + assertEquals("randombytes-1", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); + } + + @Test + public void testPutFileWithCompression() throws IOException { + // Refer to comment in the BeforeClass method for an explanation + assumeTrue(isNotWindows()); + + PutHDFS proc = new TestablePutHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setProperty(PutHDFS.COMPRESSION_CODEC, "GZIP"); + runner.setValidateExpressionUsage(false); + try (FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1");) { + Map attributes = new HashMap(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + } + + Configuration config = new Configuration(); + FileSystem fs = FileSystem.get(config); + + List failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertTrue(failedFlowFiles.isEmpty()); + + List flowFiles = runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(fs.exists(new Path("target/test-classes/randombytes-1.gz"))); + assertEquals("randombytes-1.gz", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + assertEquals("target/test-classes", flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE)); } @Test