NIFI-2963 FetchHDFS should support Compression Codec property

This closes #1166
This commit is contained in:
Pierre Villard 2016-10-28 12:35:37 +02:00 committed by Oleg Zhurakousky
parent b5550ffcf5
commit 1abd017c35
2 changed files with 100 additions and 3 deletions

View File

@ -16,9 +16,13 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -29,6 +33,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
@ -38,6 +43,7 @@ import org.apache.nifi.util.StopWatch;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -83,6 +89,7 @@ public class FetchHDFS extends AbstractHadoopProcessor {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(properties); final List<PropertyDescriptor> props = new ArrayList<>(properties);
props.add(FILENAME); props.add(FILENAME);
props.add(COMPRESSION_CODEC);
return props; return props;
} }
@ -116,10 +123,38 @@ public class FetchHDFS extends AbstractHadoopProcessor {
return; return;
} }
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
if(inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(path);
} else if (compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}
final URI uri = path.toUri(); final URI uri = path.toUri();
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
try (final FSDataInputStream inStream = hdfs.open(path, 16384)) { try {
flowFile = session.importFrom(inStream, flowFile);
final String outputFilename;
final String originalFilename = path.getName();
stream = hdfs.open(path, 16384);
// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
}
flowFile = session.importFrom(stream, flowFile);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
stopWatch.stop(); stopWatch.stop();
getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()}); getLogger().info("Successfully received content from {} for {} in {}", new Object[] {uri, flowFile, stopWatch.getDuration()});
session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().fetch(flowFile, uri.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
@ -133,6 +168,8 @@ public class FetchHDFS extends AbstractHadoopProcessor {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e}); getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[] {uri, flowFile, e});
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_COMMS_FAILURE); session.transfer(flowFile, REL_COMMS_FAILURE);
} finally {
IOUtils.closeQuietly(stream);
} }
} }

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -25,9 +27,13 @@ import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -104,6 +110,60 @@ public class TestFetchHDFS {
runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1); runner.assertAllFlowFilesTransferred(FetchHDFS.REL_FAILURE, 1);
} }
@Test
public void testAutomaticDecompression() throws IOException {
FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.enqueue(new String("trigger flow file"));
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
@Test
public void testInferCompressionCodecDisabled() throws IOException {
FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/randombytes-1.gz");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "NONE");
runner.enqueue(new String("trigger flow file"));
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@Test
public void testFileExtensionNotACompressionCodec() throws IOException {
FetchHDFS proc = new TestableFetchHDFS(kerberosProperties);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(FetchHDFS.FILENAME, "src/test/resources/testdata/13545423550275052.zip");
runner.setProperty(FetchHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.enqueue(new String("trigger flow file"));
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FetchHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
private static class TestableFetchHDFS extends FetchHDFS { private static class TestableFetchHDFS extends FetchHDFS {
private final KerberosProperties testKerberosProps; private final KerberosProperties testKerberosProps;