NIFI-2827 Adding zstd-jni to the necessary pom.xml. It's already in the LICENSE.

NIFI-2827 Update CompressContent.java to use zstd compression format
NIFI-2827 Update test cases for CompressContent.java to include zstd format
NIFI-2827 Update JsonRecordSetWriter.java to enable zstd compression format

This closes #6294

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Hawkins 2022-08-12 02:36:27 +10:00 committed by Mike Thomsen
parent 6ac7bfe0fa
commit 64acd8b1c5
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
7 changed files with 70 additions and 6 deletions

View File

@ -504,6 +504,11 @@ limitations under the License.
<artifactId>lzma-java</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.2-3</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>

View File

@ -221,6 +221,10 @@
<groupId>com.github.jponge</groupId>
<artifactId>lzma-java</artifactId>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>

View File

@ -23,6 +23,8 @@ import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -82,7 +84,7 @@ import java.util.zip.InflaterInputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate"})
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
@ -104,16 +106,17 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_SNAPPY_HADOOP = "snappy-hadoop";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
public static final String MODE_COMPRESS = "compress";
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, Deflate, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.description("The compression format to use. Valid values are: GZIP, Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED,
COMPRESSION_FORMAT_LZ4_FRAMED)
COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
@ -132,7 +135,7 @@ public class CompressContent extends AbstractProcessor {
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2)
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD)
.dependsOn(MODE, MODE_COMPRESS)
.build();
@ -184,6 +187,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-snappy-hadoop", COMPRESSION_FORMAT_SNAPPY_HADOOP);
mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
mimeTypeMap.put("application/zstd", COMPRESSION_FORMAT_ZSTD);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
}
@ -273,6 +277,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_LZ4_FRAMED:
fileExtension = ".lz4";
break;
case COMPRESSION_FORMAT_ZSTD:
fileExtension = ".zst";
break;
default:
fileExtension = "";
break;
@ -328,6 +335,11 @@ public class CompressContent extends AbstractProcessor {
mimeTypeRef.set("application/x-lz4-framed");
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
case COMPRESSION_FORMAT_ZSTD:
final int zstdcompressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger() * 2;
compressionOut = new ZstdCompressorOutputStream(bufferedOut, zstdcompressionLevel);
mimeTypeRef.set("application/zstd");
break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
@ -364,6 +376,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_LZ4_FRAMED:
compressionIn = new FramedLZ4CompressorInputStream(bufferedIn, true);
break;
case COMPRESSION_FORMAT_ZSTD:
compressionIn = new ZstdCompressorInputStream(bufferedIn);
break;
default:
compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
}

View File

@ -347,4 +347,34 @@ public class TestCompressContent {
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
@Test
public void testZstdCompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ZSTD);
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zstd");
flowFile.assertAttributeEquals("filename", "SampleFile.txt.zst");
}
@Test
public void testZstdDecompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_ZSTD);
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.zst"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
}

View File

@ -148,6 +148,11 @@
<artifactId>lzma-java</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.2-3</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>

View File

@ -72,6 +72,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
public static final String COMPRESSION_FORMAT_NONE = "none";
public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
.name("suppress-nulls")
@ -101,9 +102,9 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("compression-format")
.displayName("Compression Format")
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
.description("The compression format to use. Valid values are: GZIP, BZIP2, ZSTD, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
.allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED, COMPRESSION_FORMAT_ZSTD)
.defaultValue(COMPRESSION_FORMAT_NONE)
.required(true)
.build();
@ -203,6 +204,10 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
mimeTypeRef = "application/x-bzip2";
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
case COMPRESSION_FORMAT_ZSTD:
mimeTypeRef = "application/zstd";
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
default:
mimeTypeRef = "application/json";
compressionOut = out;