Merge branch 'NIFI-591' into develop

This commit is contained in:
Mark Payne 2015-06-07 08:08:47 -04:00
commit 68be759fe4
2 changed files with 90 additions and 39 deletions

View File

@ -68,11 +68,11 @@ import org.tukaani.xz.XZOutputStream;
@SupportsBatching
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate")
+ "attribute as appropriate")
@ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to "
+ "determine the compression type. Otherwise, this attribute is ignored.")
+ "determine the compression type. Otherwise, this attribute is ignored.")
@WritesAttribute(attribute = "mime.type", description = "If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode "
+ "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.")
+ "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.")
public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute";
@ -85,44 +85,44 @@ public class CompressContent extends AbstractProcessor {
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("Compression Level")
.description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no compression but simply archiving")
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.build();
.name("Compression Level")
.description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no compression but simply archiving")
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.build();
public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
.name("Mode")
.description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'")
.allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
.defaultValue(MODE_COMPRESS)
.required(true)
.build();
.name("Mode")
.description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'")
.allowableValues(MODE_COMPRESS, MODE_DECOMPRESS)
.defaultValue(MODE_COMPRESS)
.required(true)
.build();
public static final PropertyDescriptor UPDATE_FILENAME = new PropertyDescriptor.Builder()
.name("Update Filename")
.description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate "
+ "compression format) and add the appropriate extension when compressing data")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
.name("Update Filename")
.description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate "
+ "compression format) and add the appropriate extension when compressing data")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed")
.build();
.name("success")
.description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress")
.build();
.name("failure")
.description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@ -145,6 +145,7 @@ public class CompressContent extends AbstractProcessor {
final Map<String, String> mimeTypeMap = new HashMap<>();
mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP);
mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
}
@ -182,7 +183,7 @@ public class CompressContent extends AbstractProcessor {
compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType);
if (compressionFormatValue == null) {
logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing",
new Object[]{flowFile, mimeType});
new Object[]{flowFile, mimeType});
session.transfer(flowFile, REL_SUCCESS);
return;
}
@ -241,7 +242,7 @@ public class CompressContent extends AbstractProcessor {
break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/bzip2");
mimeTypeRef.set("application/x-bzip2");
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
}
@ -271,7 +272,7 @@ public class CompressContent extends AbstractProcessor {
}
try (final InputStream in = compressionIn;
final OutputStream out = compressionOut) {
final OutputStream out = compressionOut) {
final byte[] buffer = new byte[8192];
int len;
while ((len = in.read(buffer)) > 0) {
@ -303,7 +304,7 @@ public class CompressContent extends AbstractProcessor {
}
logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes",
new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression});
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (final ProcessException e) {

View File

@ -20,6 +20,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -69,6 +71,54 @@ public class TestCompressContent {
flowFile.assertAttributeEquals("filename", "SampleFile1.txt");
}
@Test
public void testProperMimeTypeFromBzip2() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, "compress");
runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2");
runner.setProperty(CompressContent.UPDATE_FILENAME, "false");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals("mime.type", "application/x-bzip2");
}
@Test
public void testBzip2DecompressWithBothMimeTypes() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, "decompress");
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ATTRIBUTE);
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
// ensure that we can decompress with a mime type of application/x-bzip2
final Map<String, String> attributes = new HashMap<>();
attributes.put("mime.type", "application/x-bzip2");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.bz2"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
// ensure that we can decompress with a mime type of application/bzip2. The appropriate mime type is
// application/x-bzip2, but we used to use application/bzip2. We want to ensure that we are still
// backward compatible.
runner.clearTransferState();
attributes.put("mime.type", "application/bzip2");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.bz2"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile1.txt");
}
@Test
public void testGzipDecompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);