From 83785a97989b31180a5925060604a27084dc62cf Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 5 Jun 2015 14:48:39 -0400 Subject: [PATCH] NIFI-591: Recognize both application/bzip2 and application/x-bzip2 as bzip mime types but use application/x-bzip2 when compressing --- .../processors/standard/CompressContent.java | 79 ++++++++++--------- .../standard/TestCompressContent.java | 50 ++++++++++++ 2 files changed, 90 insertions(+), 39 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 585902bb83..1b9b20c3f3 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -68,11 +68,11 @@ import org.tukaani.xz.XZOutputStream; @SupportsBatching @Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " - + "attribute as appropriate") + + "attribute as appropriate") @ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to " - + "determine the compression type. Otherwise, this attribute is ignored.") + + "determine the compression type. Otherwise, this attribute is ignored.") @WritesAttribute(attribute = "mime.type", description = "If the Mode property is set to compress, the appropriate MIME Type is set. If the Mode " - + "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.") + + "property is set to decompress and the file is successfully decompressed, this attribute is removed, as the MIME Type is no longer known.") public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_ATTRIBUTE = "use mime.type attribute"; @@ -85,44 +85,44 @@ public class CompressContent extends AbstractProcessor { public static final String MODE_DECOMPRESS = "decompress"; public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() - .name("Compression Format") - .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA") - .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA) - .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) - .required(true) - .build(); + .name("Compression Format") + .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA") + .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA) + .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) + .required(true) + .build(); public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() - .name("Compression Level") - .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing " - + "but less compression; a value of 0 indicates no compression but simply archiving") - .defaultValue("1") - .required(true) - .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") - .build(); + .name("Compression Level") + .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing " + + "but less compression; a value of 0 indicates no compression but simply archiving") + .defaultValue("1") + .required(true) + .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") + .build(); public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() - .name("Mode") - .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'") - .allowableValues(MODE_COMPRESS, MODE_DECOMPRESS) - .defaultValue(MODE_COMPRESS) - .required(true) - .build(); + .name("Mode") + .description("Indicates whether the processor should compress content or decompress content. Must be either 'compress' or 'decompress'") + .allowableValues(MODE_COMPRESS, MODE_DECOMPRESS) + .defaultValue(MODE_COMPRESS) + .required(true) + .build(); public static final PropertyDescriptor UPDATE_FILENAME = new PropertyDescriptor.Builder() - .name("Update Filename") - .description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate " - + "compression format) and add the appropriate extension when compressing data") - .required(true) - .allowableValues("true", "false") - .defaultValue("false") - .build(); + .name("Update Filename") + .description("If true, will remove the filename extension when decompressing data (only if the extension indicates the appropriate " + + "compression format) and add the appropriate extension when compressing data") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed") - .build(); + .name("success") + .description("FlowFiles will be transferred to the success relationship after successfully being compressed or decompressed") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress") - .build(); + .name("failure") + .description("FlowFiles will be transferred to the failure relationship if they fail to compress/decompress") + .build(); private List properties; private Set relationships; @@ -145,6 +145,7 @@ public class CompressContent extends AbstractProcessor { final Map mimeTypeMap = new HashMap<>(); mimeTypeMap.put("application/gzip", COMPRESSION_FORMAT_GZIP); mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2); + mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA); this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); } @@ -182,7 +183,7 @@ public class CompressContent extends AbstractProcessor { compressionFormatValue = compressionFormatMimeTypeMap.get(mimeType); if (compressionFormatValue == null) { logger.info("Mime Type of {} is '{}', which does not indicate a supported Compression Format; routing to success without decompressing", - new Object[]{flowFile, mimeType}); + new Object[]{flowFile, mimeType}); session.transfer(flowFile, REL_SUCCESS); return; } @@ -241,7 +242,7 @@ public class CompressContent extends AbstractProcessor { break; case COMPRESSION_FORMAT_BZIP2: default: - mimeTypeRef.set("application/bzip2"); + mimeTypeRef.set("application/x-bzip2"); compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); break; } @@ -271,7 +272,7 @@ public class CompressContent extends AbstractProcessor { } try (final InputStream in = compressionIn; - final OutputStream out = compressionOut) { + final OutputStream out = compressionOut) { final byte[] buffer = new byte[8192]; int len; while ((len = in.read(buffer)) > 0) { @@ -303,7 +304,7 @@ public class CompressContent extends AbstractProcessor { } logger.info("Successfully {}ed {} using {} compression format; size changed from {} to {} bytes", - new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); + new Object[]{compressionMode.toLowerCase(), flowFile, compressionFormat, sizeBeforeCompression, sizeAfterCompression}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getDuration(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } catch (final ProcessException e) { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 699db39e79..68cba4d991 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -69,6 +71,54 @@ public class TestCompressContent { flowFile.assertAttributeEquals("filename", "SampleFile1.txt"); } + @Test + public void testProperMimeTypeFromBzip2() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "compress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, "bzip2"); + runner.setProperty(CompressContent.UPDATE_FILENAME, "false"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals("mime.type", "application/x-bzip2"); + } + + @Test + public void testBzip2DecompressWithBothMimeTypes() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, "decompress"); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ATTRIBUTE); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + // ensure that we can decompress with a mime type of application/x-bzip2 + final Map attributes = new HashMap<>(); + attributes.put("mime.type", "application/x-bzip2"); + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.bz2"), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + + // ensure that we can decompress with a mime type of application/bzip2. The appropriate mime type is + // application/x-bzip2, but we used to use application/bzip2. We want to ensure that we are still + // backward compatible. + runner.clearTransferState(); + attributes.put("mime.type", "application/bzip2"); + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile1.txt.bz2"), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile1.txt"); + } + + @Test public void testGzipDecompress() throws Exception { final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);