mirror of https://github.com/apache/nifi.git
NIFI-1357 Add Snappy compression to "CompressContent" Processor
This closes #164. Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
0d13de0cf3
commit
32f476aaa7
|
@ -210,6 +210,11 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>derby</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
<version>1.1.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
|
|
|
@ -64,12 +64,16 @@ import org.tukaani.xz.XZOutputStream;
|
|||
import lzma.sdk.lzma.Decoder;
|
||||
import lzma.streams.LzmaInputStream;
|
||||
import lzma.streams.LzmaOutputStream;
|
||||
import org.xerial.snappy.SnappyFramedInputStream;
|
||||
import org.xerial.snappy.SnappyFramedOutputStream;
|
||||
import org.xerial.snappy.SnappyInputStream;
|
||||
import org.xerial.snappy.SnappyOutputStream;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2"})
|
||||
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy framed"})
|
||||
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
|
||||
+ "attribute as appropriate")
|
||||
@ReadsAttribute(attribute = "mime.type", description = "If the Compression Format is set to use mime.type attribute, this attribute is used to "
|
||||
|
@ -83,14 +87,17 @@ public class CompressContent extends AbstractProcessor {
|
|||
public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
|
||||
public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
|
||||
public static final String COMPRESSION_FORMAT_LZMA = "lzma";
|
||||
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
|
||||
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
|
||||
|
||||
public static final String MODE_COMPRESS = "compress";
|
||||
public static final String MODE_DECOMPRESS = "decompress";
|
||||
|
||||
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
|
||||
.name("Compression Format")
|
||||
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, and LZMA")
|
||||
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA)
|
||||
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
|
||||
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2,
|
||||
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
|
||||
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
|
||||
.required(true)
|
||||
.build();
|
||||
|
@ -150,6 +157,8 @@ public class CompressContent extends AbstractProcessor {
|
|||
mimeTypeMap.put("application/bzip2", COMPRESSION_FORMAT_BZIP2);
|
||||
mimeTypeMap.put("application/x-bzip2", COMPRESSION_FORMAT_BZIP2);
|
||||
mimeTypeMap.put("application/x-lzma", COMPRESSION_FORMAT_LZMA);
|
||||
mimeTypeMap.put("application/x-snappy", COMPRESSION_FORMAT_SNAPPY);
|
||||
mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
|
||||
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
|
||||
}
|
||||
|
||||
|
@ -210,6 +219,12 @@ public class CompressContent extends AbstractProcessor {
|
|||
case COMPRESSION_FORMAT_BZIP2:
|
||||
fileExtension = ".bz2";
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY:
|
||||
fileExtension = ".snappy";
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
|
||||
fileExtension = ".sz";
|
||||
break;
|
||||
default:
|
||||
fileExtension = "";
|
||||
break;
|
||||
|
@ -243,6 +258,14 @@ public class CompressContent extends AbstractProcessor {
|
|||
compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
|
||||
mimeTypeRef.set("application/x-xz");
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY:
|
||||
compressionOut = new SnappyOutputStream(bufferedOut);
|
||||
mimeTypeRef.set("application/x-snappy");
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
|
||||
compressionOut = new SnappyFramedOutputStream(bufferedOut);
|
||||
mimeTypeRef.set("application/x-snappy-framed");
|
||||
break;
|
||||
case COMPRESSION_FORMAT_BZIP2:
|
||||
default:
|
||||
mimeTypeRef.set("application/x-bzip2");
|
||||
|
@ -265,6 +288,12 @@ public class CompressContent extends AbstractProcessor {
|
|||
case COMPRESSION_FORMAT_GZIP:
|
||||
compressionIn = new GzipCompressorInputStream(bufferedIn, true);
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY:
|
||||
compressionIn = new SnappyInputStream(bufferedIn);
|
||||
break;
|
||||
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
|
||||
compressionIn = new SnappyFramedInputStream(bufferedIn);
|
||||
break;
|
||||
default:
|
||||
compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.nio.file.Paths;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
@ -30,6 +31,70 @@ import org.junit.Test;
|
|||
|
||||
public class TestCompressContent {
|
||||
|
||||
@Test
|
||||
public void testSnappyCompress() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
|
||||
runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
|
||||
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY);
|
||||
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy");
|
||||
flowFile.assertAttributeEquals("filename", "SampleFile.txt.snappy");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnappyDecompress() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
|
||||
runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
|
||||
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY);
|
||||
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.snappy"));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
|
||||
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnappyFramedCompress() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
|
||||
runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
|
||||
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED);
|
||||
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-snappy-framed");
|
||||
flowFile.assertAttributeEquals("filename", "SampleFile.txt.sz");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSnappyFramedDecompress() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
|
||||
runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
|
||||
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_SNAPPY_FRAMED);
|
||||
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.sz"));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
|
||||
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
|
||||
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBzip2DecompressConcatenated() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
|
||||
|
|
Binary file not shown.
Binary file not shown.
Loading…
Reference in New Issue