From c72fb201d52113a16b176be3e0337b4303124244 Mon Sep 17 00:00:00 2001 From: ricky Date: Mon, 31 Aug 2015 14:38:15 -0400 Subject: [PATCH] NIFI-713: Infer Hadoop Compression Automatically - Three main types of compression options: NONE : no compression AUTOMATIC : infers codec by extension SPECIFIED : specified codec (e.g. snappy, gzip, bzip, or lz4) --- .../hadoop/AbstractHadoopProcessor.java | 39 ++++++++++-- .../hadoop/CreateHadoopSequenceFile.java | 4 +- .../nifi/processors/hadoop/GetHDFS.java | 32 ++++++++-- .../nifi/processors/hadoop/PutHDFS.java | 15 ++--- .../nifi/processors/hadoop/GetHDFSTest.java | 59 +++++++++++++------ 5 files changed, 111 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java index 8519d2c8c6..0102b1f915 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.BZip2Codec; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.io.compress.GzipCodec; @@ -59,6 +58,34 @@ import org.apache.nifi.util.Tuple; * This is a base class that is helpful when building processors interacting with HDFS. */ public abstract class AbstractHadoopProcessor extends AbstractProcessor { + /** + * Compression Type Enum + */ + public enum CompressionType { + NONE, + DEFAULT, + BZIP, + GZIP, + LZ4, + SNAPPY, + AUTOMATIC; + + @Override + public String toString() { + switch (this) { + case NONE: return "NONE"; + case DEFAULT: return DefaultCodec.class.getName(); + case BZIP: return BZip2Codec.class.getName(); + case GZIP: return GzipCodec.class.getName(); + case LZ4: return Lz4Codec.class.getName(); + case SNAPPY: return SnappyCodec.class.getName(); + case AUTOMATIC: return "Automatically Detected"; + } + return null; + } + } + + private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() { @Override public ValidationResult validate(String subject, String input, ValidationContext context) { @@ -94,8 +121,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { public static final String DIRECTORY_PROP_NAME = "Directory"; - public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false) - .allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build(); + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true) + .allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build(); public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false) .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID) @@ -324,10 +351,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor { * the Hadoop Configuration * @return CompressionCodec or null */ - protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { - CompressionCodec codec = null; + protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { + org.apache.hadoop.io.compress.CompressionCodec codec = null; if (context.getProperty(COMPRESSION_CODEC).isSet()) { - String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue(); + String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString(); CompressionCodecFactory ccf = new CompressionCodecFactory(configuration); codec = ccf.getCodecByClassName(compressionClassname); } diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java index 186a290b60..1f55164180 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/CreateHadoopSequenceFile.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -152,7 +153,8 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor { sequenceFileWriter = new SequenceFileWriterImpl(); } String value = context.getProperty(COMPRESSION_TYPE).getValue(); - CompressionType compressionType = value == null ? CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : CompressionType.valueOf(value); + SequenceFile.CompressionType compressionType = value == null ? + SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value); final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf"; flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName); try { diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java index cac61b0100..de776d4806 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/GetHDFS.java @@ -33,18 +33,20 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -332,6 +334,7 @@ public class GetHDFS extends AbstractHadoopProcessor { protected void processBatchOfFiles(final List files, final ProcessContext context, final ProcessSession session) { // process the batch of files InputStream stream = null; + CompressionCodec codec = null; Configuration conf = getConfiguration(); FileSystem hdfs = getFileSystem(); final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); @@ -339,19 +342,36 @@ public class GetHDFS extends AbstractHadoopProcessor { int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT); final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue()); - final CompressionCodec codec = getCompressionCodec(context, conf); + + final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString()); + final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC; + if (inferCompressionCodec || compressionType != CompressionType.NONE) { + codec = getCompressionCodec(context, getConfiguration()); + } + final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf); for (final Path file : files) { try { if (!hdfs.exists(file)) { continue; // if file is no longer there then move on } - final String filename = file.getName(); + final String originalFilename = file.getName(); final String relativePath = getPathDifference(rootDir, file); stream = hdfs.open(file, bufferSize); + + final String outputFilename; + // Check if we should infer compression codec + if (inferCompressionCodec) { + codec = compressionCodecFactory.getCodec(file); + } + // Check if compression codec is defined (inferred or otherwise) if (codec != null) { stream = codec.createInputStream(stream); + outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension()); + } else { + outputFilename = originalFilename; } + FlowFile flowFile = session.create(); final StopWatch stopWatch = new StopWatch(true); @@ -361,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor { final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath); - flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename); if (!keepSourceFiles && !hdfs.delete(file, false)) { getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", @@ -370,7 +390,7 @@ public class GetHDFS extends AbstractHadoopProcessor { continue; } - final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename; session.getProvenanceReporter().receive(flowFile, transitUri); session.transfer(flowFile, REL_SUCCESS); getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index 419b1debd1..901159bbb8 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -219,13 +219,14 @@ public class PutHDFS extends AbstractHadoopProcessor { final CompressionCodec codec = getCompressionCodec(context, configuration); + final String filename = codec != null + ? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension() + : flowFile.getAttribute(CoreAttributes.FILENAME.key()); + Path tempDotCopyFile = null; try { - final Path tempCopyFile; - final Path copyFile; - - tempCopyFile = new Path(configuredRootDirPath, "." + flowFile.getAttribute(CoreAttributes.FILENAME.key())); - copyFile = new Path(configuredRootDirPath, flowFile.getAttribute(CoreAttributes.FILENAME.key())); + final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename); + final Path copyFile = new Path(configuredRootDirPath, filename); // Create destination directory if it does not exist try { @@ -327,8 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor { getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile, copyFile, millis, dataRate}); - final String filename = copyFile.toString(); - final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename; + final String outputPath = copyFile.toString(); + final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath; session.getProvenanceReporter().send(flowFile, transitUri); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java index b0dd17fdb4..e8714dd23e 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/GetHDFSTest.java @@ -33,8 +33,6 @@ import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.GzipCodec; import org.junit.Assert; import org.junit.Test; @@ -108,19 +106,6 @@ public class GetHDFSTest { for (ValidationResult vr : results) { Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age")); } - - results = new HashSet<>(); - runner.setProperty(GetHDFS.DIRECTORY, "/target"); - runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName()); - runner.enqueue(new byte[0]); - pc = runner.getProcessContext(); - if (pc instanceof MockProcessContext) { - results = ((MockProcessContext) pc).validate(); - } - Assert.assertEquals(1, results.size()); - for (ValidationResult vr : results) { - Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set")); - } } @Test @@ -138,18 +123,56 @@ public class GetHDFSTest { } @Test - public void testGetFilesWithCompression() throws IOException { + public void testAutomaticDecompression() throws IOException { TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); - runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName()); runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC"); runner.run(); + List flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); - assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz")); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1")); InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1"); flowFile.assertContentEquals(expected); } + + @Test + public void testInferCompressionCodecDisabled() throws IOException { + TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE"); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz")); + InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz"); + flowFile.assertContentEquals(expected); + } + + @Test + public void testFileExtensionNotACompressionCodec() throws IOException { + TestRunner runner = TestRunners.newTestRunner(GetHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata"); + runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip"); + runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true"); + runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC"); + runner.run(); + + List flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + + MockFlowFile flowFile = flowFiles.get(0); + assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip")); + InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip"); + flowFile.assertContentEquals(expected); + } }