diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 95d140a4f1..5494206242 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -210,6 +210,11 @@ language governing permissions and limitations under the License. --> derby test + + org.xerial.snappy + snappy-java + 1.1.2 + com.h2database h2 diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java index 593cf44f6d..3ef97464d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CompressContent.java @@ -64,12 +64,16 @@ import org.tukaani.xz.XZOutputStream; import lzma.sdk.lzma.Decoder; import lzma.streams.LzmaInputStream; import lzma.streams.LzmaOutputStream; +import org.xerial.snappy.SnappyFramedInputStream; +import org.xerial.snappy.SnappyFramedOutputStream; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; @EventDriven @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"}) +@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed"}) @CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type " + "attribute as appropriate") @ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to " @@ -83,14 +87,17 @@ public class CompressContent extends AbstractProcessor { public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2"; public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2"; public static final String COMPRESSION_FORMAT_LZMA = "lzma"; + public static final String COMPRESSION_FORMAT_SNAPPY = "snappy"; + public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed"; public static final String MODE_COMPRESS = "compress"; public static final String MODE_DECOMPRESS = "decompress"; public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() .name("Compression Format") - .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA") - .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA) + .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed") + .allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, + COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED) .defaultValue(COMPRESSION_FORMAT_ATTRIBUTE) .required(true) .build(); @@ -150,6 +157,8 @@ public class CompressContent extends AbstractProcessor { mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2); mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA); + mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY); + mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED); this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap); } @@ -210,6 +219,12 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_BZIP2: fileExtension = ".bz2"; break; + case COMPRESSION_FORMAT_SNAPPY: + fileExtension = ".snappy"; + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + fileExtension = ".sz"; + break; default: fileExtension = ""; break; @@ -243,6 +258,14 @@ public class CompressContent extends AbstractProcessor { compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options()); mimeTypeRef.set("application/x-xz"); break; + case COMPRESSION_FORMAT_SNAPPY: + compressionOut = new SnappyOutputStream(bufferedOut); + mimeTypeRef.set("application/x-snappy"); + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + compressionOut = new SnappyFramedOutputStream(bufferedOut); + mimeTypeRef.set("application/x-snappy-framed"); + break; case COMPRESSION_FORMAT_BZIP2: default: mimeTypeRef.set("application/x-bzip2"); @@ -265,6 +288,12 @@ public class CompressContent extends AbstractProcessor { case COMPRESSION_FORMAT_GZIP: compressionIn = new GzipCompressorInputStream(bufferedIn, true); break; + case COMPRESSION_FORMAT_SNAPPY: + compressionIn = new SnappyInputStream(bufferedIn); + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + compressionIn = new SnappyFramedInputStream(bufferedIn); + break; default: compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java index 68cba4d991..5f96036660 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestCompressContent.java @@ -23,6 +23,7 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -30,6 +31,70 @@ import org.junit.Test; public class TestCompressContent { + @Test + public void testSnappyCompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy"); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy"); + } + + @Test + public void testSnappyDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.snappy")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + } + + @Test + public void testSnappyFramedCompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-framed"); + flowFile.assertAttributeEquals("filename", "SampleFile.txt.sz"); + } + + @Test + public void testSnappyFramedDecompress() throws Exception { + final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); + runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS); + runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED); + runner.setProperty(CompressContent.UPDATE_FILENAME, "true"); + + runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.sz")); + runner.run(); + + runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt")); + flowFile.assertAttributeEquals("filename", "SampleFile.txt"); + } + @Test public void testBzip2DecompressConcatenated() throws Exception { final TestRunner runner = TestRunners.newTestRunner(CompressContent.class); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy new file mode 100644 index 0000000000..60c384aca0 Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.snappy differ diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz new file mode 100644 index 0000000000..1065381dda Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/CompressedData/SampleFile.txt.sz differ