NIFI-713: Infer Hadoop Compression Automatically

- Three main types of compression options:
	NONE 		: no compression
	AUTOMATIC 	: infers codec by extension
	SPECIFIED	: specified codec (e.g. snappy, gzip, bzip, or lz4)
This commit is contained in:
ricky 2015-08-31 14:38:15 -04:00
parent 2bb7853001
commit c72fb201d5
5 changed files with 111 additions and 38 deletions

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
@ -59,6 +58,34 @@ import org.apache.nifi.util.Tuple;
* This is a base class that is helpful when building processors interacting with HDFS. * This is a base class that is helpful when building processors interacting with HDFS.
*/ */
public abstract class AbstractHadoopProcessor extends AbstractProcessor { public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/**
* Compression Type Enum
*/
public enum CompressionType {
NONE,
DEFAULT,
BZIP,
GZIP,
LZ4,
SNAPPY,
AUTOMATIC;
@Override
public String toString() {
switch (this) {
case NONE: return "NONE";
case DEFAULT: return DefaultCodec.class.getName();
case BZIP: return BZip2Codec.class.getName();
case GZIP: return GzipCodec.class.getName();
case LZ4: return Lz4Codec.class.getName();
case SNAPPY: return SnappyCodec.class.getName();
case AUTOMATIC: return "Automatically Detected";
}
return null;
}
}
private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() { private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() {
@Override @Override
public ValidationResult validate(String subject, String input, ValidationContext context) { public ValidationResult validate(String subject, String input, ValidationContext context) {
@ -94,8 +121,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final String DIRECTORY_PROP_NAME = "Directory"; public static final String DIRECTORY_PROP_NAME = "Directory";
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false) public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true)
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build(); .allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false) public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
.description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
@ -324,10 +351,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
* the Hadoop Configuration * the Hadoop Configuration
* @return CompressionCodec or null * @return CompressionCodec or null
*/ */
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
CompressionCodec codec = null; org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) { if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue(); String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration); CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname); codec = ccf.getCodecByClassName(compressionClassname);
} }

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -152,7 +153,8 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
sequenceFileWriter = new SequenceFileWriterImpl(); sequenceFileWriter = new SequenceFileWriterImpl();
} }
String value = context.getProperty(COMPRESSION_TYPE).getValue(); String value = context.getProperty(COMPRESSION_TYPE).getValue();
CompressionType compressionType = value == null ? CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : CompressionType.valueOf(value); SequenceFile.CompressionType compressionType = value == null ?
SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try { try {

View File

@ -33,18 +33,20 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
@ -332,6 +334,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) { protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files // process the batch of files
InputStream stream = null; InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration(); Configuration conf = getConfiguration();
FileSystem hdfs = getFileSystem(); FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@ -339,19 +342,36 @@ public class GetHDFS extends AbstractHadoopProcessor {
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT); BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue());
final CompressionCodec codec = getCompressionCodec(context, conf);
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
if (inferCompressionCodec || compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
for (final Path file : files) { for (final Path file : files) {
try { try {
if (!hdfs.exists(file)) { if (!hdfs.exists(file)) {
continue; // if file is no longer there then move on continue; // if file is no longer there then move on
} }
final String filename = file.getName(); final String originalFilename = file.getName();
final String relativePath = getPathDifference(rootDir, file); final String relativePath = getPathDifference(rootDir, file);
stream = hdfs.open(file, bufferSize); stream = hdfs.open(file, bufferSize);
final String outputFilename;
// Check if we should infer compression codec
if (inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(file);
}
// Check if compression codec is defined (inferred or otherwise)
if (codec != null) { if (codec != null) {
stream = codec.createInputStream(stream); stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
} }
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
@ -361,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath); flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename); flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
if (!keepSourceFiles && !hdfs.delete(file, false)) { if (!keepSourceFiles && !hdfs.delete(file, false)) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
@ -370,7 +390,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue; continue;
} }
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename;
session.getProvenanceReporter().receive(flowFile, transitUri); session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",

View File

@ -219,13 +219,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
final CompressionCodec codec = getCompressionCodec(context, configuration); final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: flowFile.getAttribute(CoreAttributes.FILENAME.key());
Path tempDotCopyFile = null; Path tempDotCopyFile = null;
try { try {
final Path tempCopyFile; final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile; final Path copyFile = new Path(configuredRootDirPath, filename);
tempCopyFile = new Path(configuredRootDirPath, "." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
copyFile = new Path(configuredRootDirPath, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
// Create destination directory if it does not exist // Create destination directory if it does not exist
try { try {
@ -327,8 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate}); new Object[]{flowFile, copyFile, millis, dataRate});
final String filename = copyFile.toString(); final String outputPath = copyFile.toString();
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri); session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);

View File

@ -33,8 +33,6 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -108,19 +106,6 @@ public class GetHDFSTest {
for (ValidationResult vr : results) { for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age")); Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age"));
} }
results = new HashSet<>();
runner.setProperty(GetHDFS.DIRECTORY, "/target");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
} }
@Test @Test
@ -138,18 +123,56 @@ public class GetHDFSTest {
} }
@Test @Test
public void testGetFilesWithCompression() throws IOException { public void testAutomaticDecompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName());
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run(); runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size()); assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0); MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz")); assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected); flowFile.assertContentEquals(expected);
} }
@Test
public void testInferCompressionCodecDisabled() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@Test
public void testFileExtensionNotACompressionCodec() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
} }