diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java index 3b1cce2646..cd5f76c759 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/FetchHDFS.java @@ -16,9 +16,13 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.security.AccessControlException; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -29,6 +33,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -38,6 +43,7 @@ import org.apache.nifi.util.StopWatch; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; @@ -83,6 +89,7 @@ public class FetchHDFS extends AbstractHadoopProcessor { protected List getSupportedPropertyDescriptors() { final List props = new ArrayList<>(properties); props.add(FILENAME); + props.add(COMPRESSION_CODEC); return props; } @@ -116,10 +123,38 @@ public class FetchHDFS extends AbstractHadoopProcessor { return; } + InputStream stream = null; + CompressionCodec codec = null; + Configuration conf = getConfiguration(); + final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); + final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); + final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; + + if(inferCompressionCodec) { + codec = compressionCodecFactory.getCodec(path); + } else if (compressionType != CompressionType.NONE) { + codec = getCompressionCodec(context, getConfiguration()); + } + final URI uri = path.toUri(); final StopWatch stopWatch = new StopWatch(true); - try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { - flowFile = session.importFrom(inStream, flowFile); + try { + + final String outputFilename; + final String originalFilename = path.getName(); + stream = hdfs.open(path, 16384); + + // Check if compression codec is defined (inferred or otherwise) + if (codec != null) { + stream = codec.createInputStream(stream); + outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); + } else { + outputFilename = originalFilename; + } + + flowFile = session.importFrom(stream, flowFile); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); + stopWatch.stop(); getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); @@ -133,6 +168,8 @@ public class FetchHDFS extends AbstractHadoopProcessor { getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_COMMS_FAILURE); + } finally { + IOUtils.closeQuietly(stream); } } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java index 6153980759..8b8b568f27 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestFetchHDFS.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -25,9 +27,13 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -104,6 +110,60 @@ public class TestFetchHDFS { runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); } + @Test + public void testAutomaticDecompression() throws IOException { + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.enqueue(new String("trigger flow file")); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1")); + InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); + flowFile.assertContentEquals(expected); + } + + @Test + public void testInferCompressionCodecDisabled() throws IOException { + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE"); + runner.enqueue(new String("trigger flow file")); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz")); + InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz"); + flowFile.assertContentEquals(expected); + } + + @Test + public void testFileExtensionNotACompressionCodec() throws IOException { + FetchHDFS proc = new TestableFetchHDFS(kerberosProperties); + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/13545423550275052.zip"); + runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.enqueue(new String("trigger flow file")); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); + flowFile.assertContentEquals(expected); + } + private static class TestableFetchHDFS extends FetchHDFS { private final KerberosProperties testKerberosProps;