NIFI-10436 Initial basic brotli support

NIFI-10436 Add Brotli-compressed SampleFile.txt.br for unit testing
NIFI-10436 Fixup basic usage of Brotli4j API
NIFI-10436 NOTICE ok, LICENSE updated
NIFI-10436 style fixup CompressContent.java
NIFI-10436 Update CompressContent.java

This closes #6432

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Hawkins 2022-09-17 21:13:30 +10:00 committed by Mike Thomsen
parent 8c2b8cb6bc
commit 3e60414d48
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
6 changed files with 69 additions and 5 deletions

View File

@ -3120,4 +3120,7 @@ which is available under an MIT license.
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
THE SOFTWARE.
This product bundles 'Brotli4j' which is available under the Apache License 2.0

View File

@ -229,6 +229,11 @@
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>brotli4j</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>

View File

@ -25,6 +25,10 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream;
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
import com.aayushatharva.brotli4j.decoder.BrotliInputStream;
import com.aayushatharva.brotli4j.encoder.BrotliOutputStream;
import com.aayushatharva.brotli4j.Brotli4jLoader;
import com.aayushatharva.brotli4j.encoder.Encoder;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -84,7 +88,7 @@ import java.util.zip.InflaterInputStream;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd"})
@Tags({"content", "compress", "decompress", "gzip", "bzip2", "lzma", "xz-lzma2", "snappy", "snappy-hadoop", "snappy framed", "lz4-framed", "deflate", "zstd", "brotli"})
@CapabilityDescription("Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type "
+ "attribute as appropriate. This processor operates in a very memory efficient way so very large objects well beyond the heap size "
+ "are generally fine to process")
@ -107,16 +111,17 @@ public class CompressContent extends AbstractProcessor {
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
public static final String COMPRESSION_FORMAT_LZ4_FRAMED ="lz4-framed";
public static final String COMPRESSION_FORMAT_ZSTD = "zstd";
public static final String COMPRESSION_FORMAT_BROTLI = "brotli";
public static final String MODE_COMPRESS = "compress";
public static final String MODE_DECOMPRESS = "decompress";
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("Compression Format")
.description("The compression format to use. Valid values are: GZIP, Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.description("The compression format to use. Valid values are: GZIP, Deflate, ZSTD, BZIP2, XZ-LZMA2, LZMA, Brotli, Snappy, Snappy Hadoop, Snappy Framed, and LZ4-Framed")
.allowableValues(COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_BZIP2,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_LZMA, COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_HADOOP, COMPRESSION_FORMAT_SNAPPY_FRAMED,
COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD)
COMPRESSION_FORMAT_LZ4_FRAMED, COMPRESSION_FORMAT_ZSTD, COMPRESSION_FORMAT_BROTLI)
.defaultValue(COMPRESSION_FORMAT_ATTRIBUTE)
.required(true)
.build();
@ -135,7 +140,8 @@ public class CompressContent extends AbstractProcessor {
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE, COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD)
.dependsOn(COMPRESSION_FORMAT, COMPRESSION_FORMAT_ATTRIBUTE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_DEFLATE,
COMPRESSION_FORMAT_XZ_LZMA2, COMPRESSION_FORMAT_ZSTD, COMPRESSION_FORMAT_BROTLI)
.dependsOn(MODE, MODE_COMPRESS)
.build();
@ -188,6 +194,7 @@ public class CompressContent extends AbstractProcessor {
mimeTypeMap.put("application/x-snappy-framed", COMPRESSION_FORMAT_SNAPPY_FRAMED);
mimeTypeMap.put("application/x-lz4-framed", COMPRESSION_FORMAT_LZ4_FRAMED);
mimeTypeMap.put("application/zstd", COMPRESSION_FORMAT_ZSTD);
mimeTypeMap.put("application/x-brotli", COMPRESSION_FORMAT_BROTLI);
this.compressionFormatMimeTypeMap = Collections.unmodifiableMap(mimeTypeMap);
}
@ -280,6 +287,9 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_ZSTD:
fileExtension = ".zst";
break;
case COMPRESSION_FORMAT_BROTLI:
fileExtension = ".br";
break;
default:
fileExtension = "";
break;
@ -340,6 +350,13 @@ public class CompressContent extends AbstractProcessor {
compressionOut = new ZstdCompressorOutputStream(bufferedOut, zstdcompressionLevel);
mimeTypeRef.set("application/zstd");
break;
case COMPRESSION_FORMAT_BROTLI:
Brotli4jLoader.ensureAvailability();
compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel);
compressionOut = new BrotliOutputStream(bufferedOut, params);
mimeTypeRef.set("application/x-brotli");
break;
case COMPRESSION_FORMAT_BZIP2:
default:
mimeTypeRef.set("application/x-bzip2");
@ -379,6 +396,10 @@ public class CompressContent extends AbstractProcessor {
case COMPRESSION_FORMAT_ZSTD:
compressionIn = new ZstdCompressorInputStream(bufferedIn);
break;
case COMPRESSION_FORMAT_BROTLI:
Brotli4jLoader.ensureAvailability();
compressionIn = new BrotliInputStream(bufferedIn);
break;
default:
compressionIn = new CompressorStreamFactory().createCompressorInputStream(compressionFormat.toLowerCase(), bufferedIn);
}

View File

@ -377,4 +377,34 @@ public class TestCompressContent {
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
@Test
public void testBrotliCompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, CompressContent.MODE_COMPRESS);
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_BROTLI);
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-brotli");
flowFile.assertAttributeEquals("filename", "SampleFile.txt.br");
}
@Test
public void testBrotliDecompress() throws Exception {
final TestRunner runner = TestRunners.newTestRunner(CompressContent.class);
runner.setProperty(CompressContent.MODE, CompressContent.MODE_DECOMPRESS);
runner.setProperty(CompressContent.COMPRESSION_FORMAT, CompressContent.COMPRESSION_FORMAT_BROTLI);
runner.setProperty(CompressContent.UPDATE_FILENAME, "true");
runner.enqueue(Paths.get("src/test/resources/CompressedData/SampleFile.txt.br"));
runner.run();
runner.assertAllFlowFilesTransferred(CompressContent.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(CompressContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(Paths.get("src/test/resources/CompressedData/SampleFile.txt"));
flowFile.assertAttributeEquals("filename", "SampleFile.txt");
}
}

View File

@ -159,6 +159,11 @@
<artifactId>zstd-jni</artifactId>
<version>1.5.2-3</version>
</dependency>
<dependency>
<groupId>com.aayushatharva.brotli4j</groupId>
<artifactId>brotli4j</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.tukaani</groupId>
<artifactId>xz</artifactId>