Merge branch 'NIFI-713' of https://github.com/rickysaltzer/nifi into NIFI-713

This commit is contained in:
Mark Payne 2015-09-23 12:06:50 -04:00
commit da56b49b88
5 changed files with 111 additions and 38 deletions

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
@ -59,6 +58,34 @@ import org.apache.nifi.util.Tuple;
* This is a base class that is helpful when building processors interacting with HDFS.
*/
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/**
* Compression Type Enum
*/
public enum CompressionType {
NONE,
DEFAULT,
BZIP,
GZIP,
LZ4,
SNAPPY,
AUTOMATIC;
@Override
public String toString() {
switch (this) {
case NONE: return "NONE";
case DEFAULT: return DefaultCodec.class.getName();
case BZIP: return BZip2Codec.class.getName();
case GZIP: return GzipCodec.class.getName();
case LZ4: return Lz4Codec.class.getName();
case SNAPPY: return SnappyCodec.class.getName();
case AUTOMATIC: return "Automatically Detected";
}
return null;
}
}
private static final Validator KERBEROS_CONFIG_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
@ -94,8 +121,8 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final String DIRECTORY_PROP_NAME = "Directory";
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(false)
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(), GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName()).build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("Compression codec").required(true)
.allowableValues(CompressionType.values()).defaultValue(CompressionType.NONE.toString()).build();
public static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder().name("Kerberos Principal").required(false)
.description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set " + "in your nifi.properties").addValidator(Validator.VALID)
@ -324,10 +351,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
* the Hadoop Configuration
* @return CompressionCodec or null
*/
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
CompressionCodec codec = null;
protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
org.apache.hadoop.io.compress.CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue();
String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -152,7 +153,8 @@ public class CreateHadoopSequenceFile extends AbstractHadoopProcessor {
sequenceFileWriter = new SequenceFileWriterImpl();
}
String value = context.getProperty(COMPRESSION_TYPE).getValue();
CompressionType compressionType = value == null ? CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : CompressionType.valueOf(value);
SequenceFile.CompressionType compressionType = value == null ?
SequenceFile.CompressionType.valueOf(DEFAULT_COMPRESSION_TYPE) : SequenceFile.CompressionType.valueOf(value);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".sf";
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), fileName);
try {

View File

@ -33,18 +33,20 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@ -332,6 +334,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files
InputStream stream = null;
CompressionCodec codec = null;
Configuration conf = getConfiguration();
FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@ -339,19 +342,36 @@ public class GetHDFS extends AbstractHadoopProcessor {
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue());
final CompressionCodec codec = getCompressionCodec(context, conf);
final CompressionType compressionType = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).toString());
final boolean inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
if (inferCompressionCodec || compressionType != CompressionType.NONE) {
codec = getCompressionCodec(context, getConfiguration());
}
final CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
for (final Path file : files) {
try {
if (!hdfs.exists(file)) {
continue; // if file is no longer there then move on
}
final String filename = file.getName();
final String originalFilename = file.getName();
final String relativePath = getPathDifference(rootDir, file);
stream = hdfs.open(file, bufferSize);
final String outputFilename;
// Check if we should infer compression codec
if (inferCompressionCodec) {
codec = compressionCodecFactory.getCodec(file);
}
// Check if compression codec is defined (inferred or otherwise)
if (codec != null) {
stream = codec.createInputStream(stream);
outputFilename = StringUtils.removeEnd(originalFilename, codec.getDefaultExtension());
} else {
outputFilename = originalFilename;
}
FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(true);
@ -361,7 +381,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), filename);
flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
if (!keepSourceFiles && !hdfs.delete(file, false)) {
getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...",
@ -370,7 +390,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
continue;
}
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
final String transitUri = (originalFilename.startsWith("/")) ? "hdfs:/" + originalFilename : "hdfs://" + originalFilename;
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}",

View File

@ -219,13 +219,14 @@ public class PutHDFS extends AbstractHadoopProcessor {
final CompressionCodec codec = getCompressionCodec(context, configuration);
final String filename = codec != null
? flowFile.getAttribute(CoreAttributes.FILENAME.key()) + codec.getDefaultExtension()
: flowFile.getAttribute(CoreAttributes.FILENAME.key());
Path tempDotCopyFile = null;
try {
final Path tempCopyFile;
final Path copyFile;
tempCopyFile = new Path(configuredRootDirPath, "." + flowFile.getAttribute(CoreAttributes.FILENAME.key()));
copyFile = new Path(configuredRootDirPath, flowFile.getAttribute(CoreAttributes.FILENAME.key()));
final Path tempCopyFile = new Path(configuredRootDirPath, "." + filename);
final Path copyFile = new Path(configuredRootDirPath, filename);
// Create destination directory if it does not exist
try {
@ -327,8 +328,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}",
new Object[]{flowFile, copyFile, millis, dataRate});
final String filename = copyFile.toString();
final String transitUri = (filename.startsWith("/")) ? "hdfs:/" + filename : "hdfs://" + filename;
final String outputPath = copyFile.toString();
final String transitUri = (outputPath.startsWith("/")) ? "hdfs:/" + outputPath : "hdfs://" + outputPath;
session.getProvenanceReporter().send(flowFile, transitUri);
session.transfer(flowFile, REL_SUCCESS);

View File

@ -33,8 +33,6 @@ import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Assert;
import org.junit.Test;
@ -108,19 +106,6 @@ public class GetHDFSTest {
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age"));
}
results = new HashSet<>();
runner.setProperty(GetHDFS.DIRECTORY, "/target");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
}
@Test
@ -138,18 +123,56 @@ public class GetHDFSTest {
}
@Test
public void testGetFilesWithCompression() throws IOException {
public void testAutomaticDecompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName());
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
@Test
public void testInferCompressionCodecDisabled() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "NONE");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1.gz");
flowFile.assertContentEquals(expected);
}
@Test
public void testFileExtensionNotACompressionCodec() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, ".*.zip");
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, "AUTOMATIC");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).equals("13545423550275052.zip"));
InputStream expected = getClass().getResourceAsStream("/testdata/13545423550275052.zip");
flowFile.assertContentEquals(expected);
}
}