From dcdfd3dad9f02ec8d85a3e1da2f82c30e20b6dae Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 3 Jan 2017 20:47:56 +0100 Subject: [PATCH] NIFI-1797 - Added compression codec property to CreateHadoopSequenceFile processor This closes: #1387 Signed-off-by: Andre F de Miranda --- .../hadoop/CreateHadoopSequenceFile.java | 28 +++- .../hadoop/SequenceFileWriterImpl.java | 8 +- .../hadoop/util/SequenceFileWriter.java | 4 +- .../hadoop/TestCreateHadoopSequenceFile.java | 149 +++++++++++++++++- 4 files changed, 176 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 777015208d..20da921879 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -16,7 +16,9 @@ */ package org.apache.nifi.processors.hadoop; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -31,12 +33,14 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; +import org.apache.nifi.util.StopWatch; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; /** *

@@ -88,7 +92,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { } // Optional Properties. static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() - .name("compression type") + .name("Compression type") .description("Type of compression to use when creating Sequence File") .allowableValues(SequenceFile.CompressionType.values()) .build(); @@ -105,6 +109,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { public List getSupportedPropertyDescriptors() { List someProps = new ArrayList<>(properties); someProps.add(COMPRESSION_TYPE); + someProps.add(COMPRESSION_CODEC); return someProps; } @@ -149,13 +154,28 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { default: sequenceFileWriter = new SequenceFileWriterImpl(); } - String value = context.getProperty(COMPRESSION_TYPE).getValue(); - SequenceFile.CompressionType compressionType = value == null + + final Configuration configuration = getConfiguration(); + if (configuration == null) { + getLogger().error("HDFS not configured properly"); + session.transfer(flowFile, RELATIONSHIP_FAILURE); + context.yield(); + return; + } + + final CompressionCodec codec = getCompressionCodec(context, configuration); + + final String value = context.getProperty(COMPRESSION_TYPE).getValue(); + final SequenceFile.CompressionType compressionType = value == null ? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value); + final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); + try { - flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType); + StopWatch stopWatch = new StopWatch(true); + flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, RELATIONSHIP_SUCCESS); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); } catch (ProcessException e) { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java index 2c586e0bb7..f794c3b485 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/SequenceFileWriterImpl.java @@ -24,7 +24,7 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessSession; @@ -32,11 +32,11 @@ import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; import org.apache.nifi.processors.hadoop.util.InputStreamWritable; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; -import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -48,7 +48,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter { @Override public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session, - final Configuration configuration, final CompressionType compressionType) { + final Configuration configuration, final CompressionType compressionType, final CompressionCodec compressionCodec) { if (flowFile.getSize() > Integer.MAX_VALUE) { throw new IllegalArgumentException("Cannot write " + flowFile @@ -97,7 +97,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter { SequenceFile.Writer.stream(fsDataOutputStream), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(InputStreamWritable.class), - SequenceFile.Writer.compression(compressionType, new DefaultCodec()))) { + SequenceFile.Writer.compression(compressionType, compressionCodec))) { processInputStream(in, flowFile, writer); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java index 6ad6461834..d19e3b698d 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/util/SequenceFileWriter.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessSession; @@ -31,7 +32,8 @@ public interface SequenceFileWriter { * @param session session * @param configuration configuration * @param compressionType compression type + * @param compressionCodec compression codec * @return the written to SequenceFile flow file */ - FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType); + FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType, CompressionCodec compressionCodec); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java index 419323f2e4..1ac62afc58 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestCreateHadoopSequenceFile.java @@ -17,7 +17,10 @@ package org.apache.nifi.processors.hadoop; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -30,8 +33,6 @@ import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -49,7 +50,6 @@ import static org.mockito.Mockito.when; public class TestCreateHadoopSequenceFile { private TestRunner controller; - private static Logger LOGGER; private final File testdata = new File("src/test/resources/testdata"); private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"), @@ -61,7 +61,6 @@ public class TestCreateHadoopSequenceFile { @BeforeClass public static void setUpClass() { - LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug"); } @@ -204,6 +203,147 @@ public class TestCreateHadoopSequenceFile { // fos.close(); } + @Test + public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException { + + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name()); + controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); + + File inFile = inFiles[0]; + try (FileInputStream fin = new FileInputStream(inFile) ){ + controller.enqueue(fin); + } + controller.run(); + + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + List failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE); + + assertEquals(0, failedFlowFiles.size()); + assertEquals(1, successSeqFiles.size()); + + MockFlowFile ff = successSeqFiles.iterator().next(); + byte[] data = ff.toByteArray(); + + + final String magicHeader = new String(data, 0, 3, "UTF-8"); + assertEquals("SEQ", magicHeader); + // Format of header is SEQ followed by the version (1 byte). + // Then, the length of the Key type (1 byte), then the Key type + // Then, the length of the Value type(1 byte), then the Value type + final String keyType = Text.class.getCanonicalName(); + final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1; + final int valueTypeLength = data[5 + keyType.length()]; + final String valueType = BytesWritable.class.getCanonicalName(); + + assertEquals(valueType.length(), valueTypeLength); + assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8")); + + final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length(); + final int blockCompressionIndex = compressionIndex + 1; + + assertEquals(1, data[compressionIndex]); + assertEquals(1, data[blockCompressionIndex]); + + final int codecTypeSize = data[blockCompressionIndex + 1]; + final int codecTypeStartIndex = blockCompressionIndex + 2; + + assertEquals(BZip2Codec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8")); + } + + @Test + public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException { + + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name()); + controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); + + File inFile = inFiles[0]; + try (FileInputStream fin = new FileInputStream(inFile) ){ + controller.enqueue(fin); + } + controller.run(); + + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + List failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE); + + assertEquals(0, failedFlowFiles.size()); + assertEquals(1, successSeqFiles.size()); + + MockFlowFile ff = successSeqFiles.iterator().next(); + byte[] data = ff.toByteArray(); + + + final String magicHeader = new String(data, 0, 3, "UTF-8"); + assertEquals("SEQ", magicHeader); + // Format of header is SEQ followed by the version (1 byte). + // Then, the length of the Key type (1 byte), then the Key type + // Then, the length of the Value type(1 byte), then the Value type + final String keyType = Text.class.getCanonicalName(); + final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1; + final int valueTypeLength = data[5 + keyType.length()]; + final String valueType = BytesWritable.class.getCanonicalName(); + + assertEquals(valueType.length(), valueTypeLength); + assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8")); + + final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length(); + final int blockCompressionIndex = compressionIndex + 1; + + assertEquals(1, data[compressionIndex]); + assertEquals(1, data[blockCompressionIndex]); + + final int codecTypeSize = data[blockCompressionIndex + 1]; + final int codecTypeStartIndex = blockCompressionIndex + 2; + + assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8")); + } + + @Test + public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException { + + controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name()); + controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name()); + + File inFile = inFiles[0]; + try (FileInputStream fin = new FileInputStream(inFile) ){ + controller.enqueue(fin); + } + controller.run(); + + List successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS); + List failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE); + + assertEquals(0, failedFlowFiles.size()); + assertEquals(1, successSeqFiles.size()); + + MockFlowFile ff = successSeqFiles.iterator().next(); + byte[] data = ff.toByteArray(); + + + final String magicHeader = new String(data, 0, 3, "UTF-8"); + assertEquals("SEQ", magicHeader); + // Format of header is SEQ followed by the version (1 byte). + // Then, the length of the Key type (1 byte), then the Key type + // Then, the length of the Value type(1 byte), then the Value type + final String keyType = Text.class.getCanonicalName(); + final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1; + final int valueTypeLength = data[5 + keyType.length()]; + final String valueType = BytesWritable.class.getCanonicalName(); + + assertEquals(valueType.length(), valueTypeLength); + assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8")); + + final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length(); + final int blockCompressionIndex = compressionIndex + 1; + + assertEquals(1, data[compressionIndex]); + assertEquals(1, data[blockCompressionIndex]); + + final int codecTypeSize = data[blockCompressionIndex + 1]; + final int codecTypeStartIndex = blockCompressionIndex + 2; + + assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8")); + } + private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile { private KerberosProperties testKerbersProperties; @@ -217,4 +357,5 @@ public class TestCreateHadoopSequenceFile { return testKerbersProperties; } } + }