NIFI-600: Adding compression support to PutHDFS and GetHDFS

This commit is contained in:
Tim Reardon 2015-05-07 20:23:30 -04:00
parent e1aa4890a0
commit a973cc4f2a
6 changed files with 98 additions and 7 deletions

View File

@ -42,6 +42,13 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
/**
@ -60,6 +67,13 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
public static final String DIRECTORY_PROP_NAME = "Directory";
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression codec")
.required(false)
.allowableValues(BZip2Codec.class.getName(), DefaultCodec.class.getName(),
GzipCodec.class.getName(), Lz4Codec.class.getName(), SnappyCodec.class.getName())
.build();
protected static final List<PropertyDescriptor> properties;
static {
@ -228,6 +242,23 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
};
}
/**
* Returns the configured CompressionCodec, or null if none is configured.
*
* @param context the ProcessContext
* @param configuration the Hadoop Configuration
* @return CompressionCodec or null
*/
protected CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) {
CompressionCodec codec = null;
if (context.getProperty(COMPRESSION_CODEC).isSet()) {
String compressionClassname = context.getProperty(COMPRESSION_CODEC).getValue();
CompressionCodecFactory ccf = new CompressionCodecFactory(configuration);
codec = ccf.getCodecByClassName(compressionClassname);
}
return codec;
}
/**
* Returns the relative path of the child that does not include the filename

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -33,11 +34,11 @@ import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -192,6 +193,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
props.add(POLLING_INTERVAL);
props.add(BATCH_SIZE);
props.add(BUFFER_SIZE);
props.add(COMPRESSION_CODEC);
localProperties = Collections.unmodifiableList(props);
}
@ -329,7 +331,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
protected void processBatchOfFiles(final List<Path> files, final ProcessContext context, final ProcessSession session) {
// process the batch of files
FSDataInputStream stream = null;
InputStream stream = null;
Configuration conf = getConfiguration();
FileSystem hdfs = getFileSystem();
final boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
@ -337,6 +339,7 @@ public class GetHDFS extends AbstractHadoopProcessor {
int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY,
BUFFER_SIZE_DEFAULT);
final Path rootDir = new Path(context.getProperty(DIRECTORY).getValue());
final CompressionCodec codec = getCompressionCodec(context, conf);
for (final Path file : files) {
try {
if (!hdfs.exists(file)) {
@ -346,6 +349,9 @@ public class GetHDFS extends AbstractHadoopProcessor {
final String relativePath = getPathDifference(rootDir, file);
stream = hdfs.open(file, bufferSize);
if (codec != null) {
stream = codec.createInputStream(stream);
}
FlowFile flowFile = session.create();
final StopWatch stopWatch = new StopWatch(true);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.hadoop;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -27,10 +28,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -157,6 +158,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
props.add(UMASK);
props.add(REMOTE_OWNER);
props.add(REMOTE_GROUP);
props.add(COMPRESSION_CODEC);
localProperties = Collections.unmodifiableList(props);
}
@ -215,6 +217,8 @@ public class PutHDFS extends AbstractHadoopProcessor {
final short replication = replicationProp != null ? replicationProp.shortValue() : hdfs
.getDefaultReplication(configuredRootDirPath);
final CompressionCodec codec = getCompressionCodec(context, configuration);
Path tempDotCopyFile = null;
try {
final Path tempCopyFile;
@ -266,10 +270,13 @@ public class PutHDFS extends AbstractHadoopProcessor {
@Override
public void process(InputStream in) throws IOException {
FSDataOutputStream fos = null;
OutputStream fos = null;
Path createdFile = null;
try {
fos = hdfs.create(tempCopyFile, true, bufferSize, replication, blockSize);
if (codec != null) {
fos = codec.createOutputStream(fos);
}
createdFile = tempCopyFile;
BufferedInputStream bis = new BufferedInputStream(in);
StreamUtils.copy(bis, fos);

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.hadoop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -30,8 +32,9 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Assert;
import org.junit.Test;
@ -105,6 +108,19 @@ public class GetHDFSTest {
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Minimum File Age cannot be greater than Maximum File Age"));
}
results = new HashSet<>();
runner.setProperty(GetHDFS.DIRECTORY, "/target");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
}
@Test
@ -115,9 +131,25 @@ public class GetHDFSTest {
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(3, flowFiles.size());
assertEquals(4, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("random"));
}
}
@Test
public void testGetFilesWithCompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(GetHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "src/test/resources/testdata");
runner.setProperty(GetHDFS.FILE_FILTER_REGEX, "random.*.gz");
runner.setProperty(GetHDFS.COMPRESSION_CODEC, GzipCodec.class.getName());
runner.setProperty(GetHDFS.KEEP_SOURCE_FILE, "true");
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetHDFS.REL_SUCCESS);
assertEquals(1, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
assertTrue(flowFile.getAttribute(CoreAttributes.FILENAME.key()).startsWith("randombytes-1.gz"));
InputStream expected = getClass().getResourceAsStream("/testdata/randombytes-1");
flowFile.assertContentEquals(expected);
}
}

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.hadoop;
import org.apache.nifi.processors.hadoop.PutHDFS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -33,10 +34,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -136,6 +137,20 @@ public class PutHDFSTest {
for (ValidationResult vr : results) {
assertTrue(vr.toString().contains("is invalid because octal umask [2000] is not a valid umask"));
}
results = new HashSet<>();
runner = TestRunners.newTestRunner(PutHDFS.class);
runner.setProperty(PutHDFS.DIRECTORY, "/target");
runner.setProperty(PutHDFS.COMPRESSION_CODEC, CompressionCodec.class.getName());
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
Assert.assertTrue(vr.toString().contains("is invalid because Given value not found in allowed set"));
}
}
// The following only seems to work from cygwin...something about not finding the 'chmod' command.