NIFI-1797 - Added compression codec property to CreateHadoopSequenceFile processor

This closes: #1387

Signed-off-by: Andre F de Miranda <trixpan@users.noreply.github.com>
This commit is contained in:
Pierre Villard 2017-01-03 20:47:56 +01:00 committed by Andre F de Miranda
parent a32e1509b6
commit dcdfd3dad9
4 changed files with 176 additions and 13 deletions

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -31,12 +33,14 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.nifi.util.StopWatch;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
/** /**
* <p> * <p>
@ -88,7 +92,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
} }
// Optional Properties. // Optional Properties.
static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("compression type") .name("Compression type")
.description("Type of compression to use when creating Sequence File") .description("Type of compression to use when creating Sequence File")
.allowableValues(SequenceFile.CompressionType.values()) .allowableValues(SequenceFile.CompressionType.values())
.build(); .build();
@ -105,6 +109,7 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
public List<PropertyDescriptor> getSupportedPropertyDescriptors() { public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> someProps = new ArrayList<>(properties); List<PropertyDescriptor> someProps = new ArrayList<>(properties);
someProps.add(COMPRESSION_TYPE); someProps.add(COMPRESSION_TYPE);
someProps.add(COMPRESSION_CODEC);
return someProps; return someProps;
} }
@ -149,13 +154,28 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
default: default:
sequenceFileWriter = new SequenceFileWriterImpl(); sequenceFileWriter = new SequenceFileWriterImpl();
} }
String value = context.getProperty(COMPRESSION_TYPE).getValue();
SequenceFile.CompressionType compressionType = value == null final Configuration configuration = getConfiguration();
if (configuration == null) {
getLogger().error("HDFS not configured properly");
session.transfer(flowFile, RELATIONSHIP_FAILURE);
context.yield();
return;
}
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String value = context.getProperty(COMPRESSION_TYPE).getValue();
final SequenceFile.CompressionType compressionType = value == null
? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value); ? SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try { try {
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, getConfiguration(), compressionType); StopWatch stopWatch = new StopWatch(true);
flowFile = sequenceFileWriter.writeSequenceFile(flowFile, session, configuration, compressionType, codec);
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, RELATIONSHIP_SUCCESS); session.transfer(flowFile, RELATIONSHIP_SUCCESS);
getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS}); getLogger().info("Transferred flowfile {} to {}", new Object[]{flowFile, RELATIONSHIP_SUCCESS});
} catch (ProcessException e) { } catch (ProcessException e) {

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -32,11 +32,11 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream; import org.apache.nifi.processors.hadoop.util.ByteFilteringOutputStream;
import org.apache.nifi.processors.hadoop.util.InputStreamWritable; import org.apache.nifi.processors.hadoop.util.InputStreamWritable;
import org.apache.nifi.processors.hadoop.util.SequenceFileWriter; import org.apache.nifi.processors.hadoop.util.SequenceFileWriter;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -48,7 +48,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
@Override @Override
public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session, public FlowFile writeSequenceFile(final FlowFile flowFile, final ProcessSession session,
final Configuration configuration, final CompressionType compressionType) { final Configuration configuration, final CompressionType compressionType, final CompressionCodec compressionCodec) {
if (flowFile.getSize() > Integer.MAX_VALUE) { if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Cannot write " + flowFile throw new IllegalArgumentException("Cannot write " + flowFile
@ -97,7 +97,7 @@ public class SequenceFileWriterImpl implements SequenceFileWriter {
SequenceFile.Writer.stream(fsDataOutputStream), SequenceFile.Writer.stream(fsDataOutputStream),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(InputStreamWritable.class), SequenceFile.Writer.valueClass(InputStreamWritable.class),
SequenceFile.Writer.compression(compressionType, new DefaultCodec()))) { SequenceFile.Writer.compression(compressionType, compressionCodec))) {
processInputStream(in, flowFile, writer); processInputStream(in, flowFile, writer);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.processors.hadoop.util;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -31,7 +32,8 @@ public interface SequenceFileWriter {
* @param session session * @param session session
* @param configuration configuration * @param configuration configuration
* @param compressionType compression type * @param compressionType compression type
* @param compressionCodec compression codec
* @return the written to SequenceFile flow file * @return the written to SequenceFile flow file
*/ */
FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType); FlowFile writeSequenceFile(FlowFile flowFile, ProcessSession session, Configuration configuration, CompressionType compressionType, CompressionCodec compressionCodec);
} }

View File

@ -17,7 +17,10 @@
package org.apache.nifi.processors.hadoop; package org.apache.nifi.processors.hadoop;
import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -30,8 +33,6 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
@ -49,7 +50,6 @@ import static org.mockito.Mockito.when;
public class TestCreateHadoopSequenceFile { public class TestCreateHadoopSequenceFile {
private TestRunner controller; private TestRunner controller;
private static Logger LOGGER;
private final File testdata = new File("src/test/resources/testdata"); private final File testdata = new File("src/test/resources/testdata");
private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"), private final File[] inFiles = new File[]{new File(testdata, "randombytes-1"),
@ -61,7 +61,6 @@ public class TestCreateHadoopSequenceFile {
@BeforeClass @BeforeClass
public static void setUpClass() { public static void setUpClass() {
LOGGER = LoggerFactory.getLogger(TestCreateHadoopSequenceFile.class);
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug"); System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hadoop", "debug");
} }
@ -204,6 +203,147 @@ public class TestCreateHadoopSequenceFile {
// fos.close(); // fos.close();
} }
@Test
public void testSequenceFileBzipCompressionCodec() throws UnsupportedEncodingException, IOException {
controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.BZIP.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());
MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();
final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();
assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;
assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);
final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;
assertEquals(BZip2Codec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}
@Test
public void testSequenceFileDefaultCompressionCodec() throws UnsupportedEncodingException, IOException {
controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.DEFAULT.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());
MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();
final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();
assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;
assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);
final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;
assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}
@Test
public void testSequenceFileNoneCompressionCodec() throws UnsupportedEncodingException, IOException {
controller.setProperty(AbstractHadoopProcessor.COMPRESSION_CODEC, AbstractHadoopProcessor.CompressionType.NONE.name());
controller.setProperty(CreateHadoopSequenceFile.COMPRESSION_TYPE, SequenceFile.CompressionType.BLOCK.name());
File inFile = inFiles[0];
try (FileInputStream fin = new FileInputStream(inFile) ){
controller.enqueue(fin);
}
controller.run();
List<MockFlowFile> successSeqFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_SUCCESS);
List<MockFlowFile> failedFlowFiles = controller.getFlowFilesForRelationship(CreateHadoopSequenceFile.RELATIONSHIP_FAILURE);
assertEquals(0, failedFlowFiles.size());
assertEquals(1, successSeqFiles.size());
MockFlowFile ff = successSeqFiles.iterator().next();
byte[] data = ff.toByteArray();
final String magicHeader = new String(data, 0, 3, "UTF-8");
assertEquals("SEQ", magicHeader);
// Format of header is SEQ followed by the version (1 byte).
// Then, the length of the Key type (1 byte), then the Key type
// Then, the length of the Value type(1 byte), then the Value type
final String keyType = Text.class.getCanonicalName();
final int valueTypeStart = 3 + 1 + 1 + keyType.length() + 1;
final int valueTypeLength = data[5 + keyType.length()];
final String valueType = BytesWritable.class.getCanonicalName();
assertEquals(valueType.length(), valueTypeLength);
assertEquals(valueType, new String(data, valueTypeStart, valueType.length(), "UTF-8"));
final int compressionIndex = 3 + 1 + 1 + keyType.length() + 1 + valueType.length();
final int blockCompressionIndex = compressionIndex + 1;
assertEquals(1, data[compressionIndex]);
assertEquals(1, data[blockCompressionIndex]);
final int codecTypeSize = data[blockCompressionIndex + 1];
final int codecTypeStartIndex = blockCompressionIndex + 2;
assertEquals(DefaultCodec.class.getCanonicalName(), new String(data, codecTypeStartIndex, codecTypeSize, "UTF-8"));
}
private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile { private static class TestableCreateHadoopSequenceFile extends CreateHadoopSequenceFile {
private KerberosProperties testKerbersProperties; private KerberosProperties testKerbersProperties;
@ -217,4 +357,5 @@ public class TestCreateHadoopSequenceFile {
return testKerbersProperties; return testKerbersProperties;
} }
} }
} }