NIFI-12578 Updated nifi-compress-bundle using current API methods

This closes #8215

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2024-01-07 20:27:11 +01:00 committed by exceptionfactory
parent 77cdba1efa
commit 655baec4cb
No known key found for this signature in database
3 changed files with 164 additions and 207 deletions

View File

@ -67,13 +67,11 @@ import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -100,19 +98,8 @@ public class ModifyCompression extends AbstractProcessor {
.name("Input Compression Strategy")
.displayName("Input Compression Strategy")
.description("The strategy to use for decompressing input FlowFiles")
.allowableValues(CompressionStrategy.NONE.asAllowableValue(),
CompressionStrategy.MIME_TYPE_ATTRIBUTE.asAllowableValue(),
CompressionStrategy.GZIP.asAllowableValue(),
CompressionStrategy.DEFLATE.asAllowableValue(),
CompressionStrategy.BZIP2.asAllowableValue(),
CompressionStrategy.XZ_LZMA2.asAllowableValue(),
CompressionStrategy.LZMA.asAllowableValue(),
CompressionStrategy.SNAPPY.asAllowableValue(),
CompressionStrategy.SNAPPY_FRAMED.asAllowableValue(),
CompressionStrategy.LZ4_FRAMED.asAllowableValue(),
CompressionStrategy.ZSTD.asAllowableValue(),
CompressionStrategy.BROTLI.asAllowableValue())
.defaultValue(CompressionStrategy.NONE.getValue())
.allowableValues(EnumSet.complementOf(EnumSet.of(CompressionStrategy.SNAPPY_HADOOP)))
.defaultValue(CompressionStrategy.NONE)
.required(true)
.build();
@ -120,19 +107,8 @@ public class ModifyCompression extends AbstractProcessor {
.name("Output Compression Strategy")
.name("Output Compression Strategy")
.description("The strategy to use for compressing output FlowFiles")
.allowableValues(CompressionStrategy.NONE.asAllowableValue(),
CompressionStrategy.GZIP.asAllowableValue(),
CompressionStrategy.DEFLATE.asAllowableValue(),
CompressionStrategy.BZIP2.asAllowableValue(),
CompressionStrategy.XZ_LZMA2.asAllowableValue(),
CompressionStrategy.LZMA.asAllowableValue(),
CompressionStrategy.SNAPPY.asAllowableValue(),
CompressionStrategy.SNAPPY_HADOOP.asAllowableValue(),
CompressionStrategy.SNAPPY_FRAMED.asAllowableValue(),
CompressionStrategy.LZ4_FRAMED.asAllowableValue(),
CompressionStrategy.ZSTD.asAllowableValue(),
CompressionStrategy.BROTLI.asAllowableValue())
.defaultValue(CompressionStrategy.NONE.getValue())
.allowableValues(EnumSet.complementOf(EnumSet.of(CompressionStrategy.MIME_TYPE_ATTRIBUTE)))
.defaultValue(CompressionStrategy.NONE)
.required(true)
.build();
@ -160,7 +136,7 @@ public class ModifyCompression extends AbstractProcessor {
.description("Processing strategy for filename attribute on output FlowFiles")
.required(true)
.allowableValues(FilenameStrategy.class)
.defaultValue(FilenameStrategy.UPDATED.getValue())
.defaultValue(FilenameStrategy.UPDATED)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -173,17 +149,14 @@ public class ModifyCompression extends AbstractProcessor {
.description("FlowFiles will be transferred to the failure relationship on compression modification errors")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
private static final List<PropertyDescriptor> PROPERTIES = List.of(
INPUT_COMPRESSION_STRATEGY,
OUTPUT_COMPRESSION_STRATEGY,
OUTPUT_COMPRESSION_LEVEL,
OUTPUT_FILENAME_STRATEGY
));
);
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new LinkedHashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE);
private static final Map<String, CompressionStrategy> compressionFormatMimeTypeMap;
@ -222,7 +195,7 @@ public class ModifyCompression extends AbstractProcessor {
}
final CompressionStrategy inputCompressionStrategy;
final CompressionStrategy configuredInputCompressionStrategy = getCompressionStrategy(context.getProperty(INPUT_COMPRESSION_STRATEGY).getValue());
final CompressionStrategy configuredInputCompressionStrategy = context.getProperty(INPUT_COMPRESSION_STRATEGY).asDescribedValue(CompressionStrategy.class);
if (CompressionStrategy.MIME_TYPE_ATTRIBUTE == configuredInputCompressionStrategy) {
final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
if (mimeType == null) {
@ -241,7 +214,7 @@ public class ModifyCompression extends AbstractProcessor {
inputCompressionStrategy = configuredInputCompressionStrategy;
}
final CompressionStrategy outputCompressionStrategy = getCompressionStrategy(context.getProperty(OUTPUT_COMPRESSION_STRATEGY).getValue());
final CompressionStrategy outputCompressionStrategy = context.getProperty(OUTPUT_COMPRESSION_STRATEGY).asDescribedValue(CompressionStrategy.class);
final AtomicReference<String> mimeTypeRef = new AtomicReference<>(null);
final StopWatch stopWatch = new StopWatch(true);
final long inputFileSize = flowFile.getSize();
@ -284,40 +257,34 @@ public class ModifyCompression extends AbstractProcessor {
}
private InputStream getCompressionInputStream(final CompressionStrategy compressionFormat, final InputStream parentInputStream) throws IOException {
if (CompressionStrategy.LZMA == compressionFormat) {
return new LzmaInputStream(parentInputStream, new Decoder());
} else if (CompressionStrategy.XZ_LZMA2 == compressionFormat) {
return new XZInputStream(parentInputStream);
} else if (CompressionStrategy.BZIP2 == compressionFormat) {
// need this two-arg constructor to support concatenated streams
return new BZip2CompressorInputStream(parentInputStream, true);
} else if (CompressionStrategy.GZIP == compressionFormat) {
return new GzipCompressorInputStream(parentInputStream, true);
} else if (CompressionStrategy.DEFLATE == compressionFormat) {
return new InflaterInputStream(parentInputStream);
} else if (CompressionStrategy.SNAPPY == compressionFormat) {
return new SnappyInputStream(parentInputStream);
} else if (CompressionStrategy.SNAPPY_HADOOP == compressionFormat) {
throw new IOException("Cannot decompress snappy-hadoop");
} else if (CompressionStrategy.SNAPPY_FRAMED == compressionFormat) {
return new SnappyFramedInputStream(parentInputStream);
} else if (CompressionStrategy.LZ4_FRAMED == compressionFormat) {
return new FramedLZ4CompressorInputStream(parentInputStream, true);
} else if (CompressionStrategy.ZSTD == compressionFormat) {
return new ZstdCompressorInputStream(parentInputStream);
} else if (CompressionStrategy.BROTLI == compressionFormat) {
Brotli4jLoader.ensureAvailability();
return new BrotliInputStream(parentInputStream);
} else if (CompressionStrategy.NONE == compressionFormat) {
return parentInputStream;
} else {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
return new CompressorStreamFactory().createCompressorInputStream(compressorStreamFormat, parentInputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
return switch (compressionFormat) {
case LZMA -> new LzmaInputStream(parentInputStream, new Decoder());
case XZ_LZMA2 -> new XZInputStream(parentInputStream);
case BZIP2 -> {
// need this two-arg constructor to support concatenated streams
yield new BZip2CompressorInputStream(parentInputStream, true);
}
}
case GZIP -> new GzipCompressorInputStream(parentInputStream, true);
case DEFLATE -> new InflaterInputStream(parentInputStream);
case SNAPPY -> new SnappyInputStream(parentInputStream);
case SNAPPY_HADOOP -> throw new IOException("Cannot decompress snappy-hadoop");
case SNAPPY_FRAMED -> new SnappyFramedInputStream(parentInputStream);
case LZ4_FRAMED -> new FramedLZ4CompressorInputStream(parentInputStream, true);
case ZSTD -> new ZstdCompressorInputStream(parentInputStream);
case BROTLI -> {
Brotli4jLoader.ensureAvailability();
yield new BrotliInputStream(parentInputStream);
}
case NONE -> parentInputStream;
default -> {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
yield new CompressorStreamFactory().createCompressorInputStream(compressorStreamFormat, parentInputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
}
}
};
}
private OutputStream getCompressionOutputStream(
@ -327,54 +294,65 @@ public class ModifyCompression extends AbstractProcessor {
final OutputStream parentOutputStream
) throws IOException {
final OutputStream compressionOut;
if (CompressionStrategy.GZIP == compressionFormat) {
compressionOut = new GZIPOutputStream(parentOutputStream, compressionLevel);
mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]);
} else if (CompressionStrategy.DEFLATE == compressionFormat) {
compressionOut = new DeflaterOutputStream(parentOutputStream, new Deflater(compressionLevel));
mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]);
} else if (CompressionStrategy.LZMA == compressionFormat) {
compressionOut = new LzmaOutputStream.Builder(parentOutputStream).build();
mimeTypeRef.set(CompressionStrategy.LZMA.getMimeTypes()[0]);
} else if (CompressionStrategy.XZ_LZMA2 == compressionFormat) {
compressionOut = new XZOutputStream(parentOutputStream, new LZMA2Options(compressionLevel));
mimeTypeRef.set(CompressionStrategy.XZ_LZMA2.getMimeTypes()[0]);
} else if (CompressionStrategy.SNAPPY == compressionFormat) {
compressionOut = new SnappyOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY.getMimeTypes()[0]);
} else if (CompressionStrategy.SNAPPY_HADOOP == compressionFormat) {
compressionOut = new SnappyHadoopCompatibleOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]);
} else if (CompressionStrategy.SNAPPY_FRAMED == compressionFormat) {
compressionOut = new SnappyFramedOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]);
} else if (CompressionStrategy.LZ4_FRAMED == compressionFormat) {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
switch (compressionFormat) {
case GZIP -> {
compressionOut = new GZIPOutputStream(parentOutputStream, compressionLevel);
mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]);
}
mimeTypeRef.set(CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]);
} else if (CompressionStrategy.ZSTD == compressionFormat) {
final int outputCompressionLevel = compressionLevel * 2;
compressionOut = new ZstdCompressorOutputStream(parentOutputStream, outputCompressionLevel);
mimeTypeRef.set(CompressionStrategy.ZSTD.getMimeTypes()[0]);
} else if (CompressionStrategy.BROTLI == compressionFormat) {
Brotli4jLoader.ensureAvailability();
Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel);
compressionOut = new BrotliOutputStream(parentOutputStream, params);
mimeTypeRef.set(CompressionStrategy.BROTLI.getMimeTypes()[0]);
} else if (CompressionStrategy.BZIP2 == compressionFormat) {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
case DEFLATE -> {
compressionOut = new DeflaterOutputStream(parentOutputStream, new Deflater(compressionLevel));
mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]);
}
mimeTypeRef.set(CompressionStrategy.BZIP2.getMimeTypes()[0]);
} else {
compressionOut = parentOutputStream;
case LZMA -> {
compressionOut = new LzmaOutputStream.Builder(parentOutputStream).build();
mimeTypeRef.set(CompressionStrategy.LZMA.getMimeTypes()[0]);
}
case XZ_LZMA2 -> {
compressionOut = new XZOutputStream(parentOutputStream, new LZMA2Options(compressionLevel));
mimeTypeRef.set(CompressionStrategy.XZ_LZMA2.getMimeTypes()[0]);
}
case SNAPPY -> {
compressionOut = new SnappyOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY.getMimeTypes()[0]);
}
case SNAPPY_HADOOP -> {
compressionOut = new SnappyHadoopCompatibleOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]);
}
case SNAPPY_FRAMED -> {
compressionOut = new SnappyFramedOutputStream(parentOutputStream);
mimeTypeRef.set(CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]);
}
case LZ4_FRAMED -> {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
}
mimeTypeRef.set(CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]);
}
case ZSTD -> {
final int outputCompressionLevel = compressionLevel * 2;
compressionOut = new ZstdCompressorOutputStream(parentOutputStream, outputCompressionLevel);
mimeTypeRef.set(CompressionStrategy.ZSTD.getMimeTypes()[0]);
}
case BROTLI -> {
Brotli4jLoader.ensureAvailability();
Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel);
compressionOut = new BrotliOutputStream(parentOutputStream, params);
mimeTypeRef.set(CompressionStrategy.BROTLI.getMimeTypes()[0]);
}
case BZIP2 -> {
final String compressorStreamFormat = compressionFormat.getValue().toLowerCase();
try {
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream);
} catch (final CompressorException e) {
throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e);
}
mimeTypeRef.set(CompressionStrategy.BZIP2.getMimeTypes()[0]);
}
case null, default -> compressionOut = parentOutputStream;
}
return compressionOut;
}
@ -391,9 +369,4 @@ public class ModifyCompression extends AbstractProcessor {
}
return truncatedFilename + outputCompressionStrategy.getFileExtension();
}
private CompressionStrategy getCompressionStrategy(final String propertyValue) {
final Optional<CompressionStrategy> compressionInfo = CompressionStrategy.findValue(propertyValue);
return compressionInfo.orElseThrow(() -> new IllegalArgumentException(String.format("Compression Format [%s] not supported", propertyValue)));
}
}

View File

@ -16,14 +16,9 @@
*/
package org.apache.nifi.processors.compress.property;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import java.util.Arrays;
import java.util.Optional;
public enum CompressionStrategy implements DescribedValue {
NONE("no compression", "No Compression", ""),
MIME_TYPE_ATTRIBUTE("use mime.type attribute", "Use the [mime.type] attribute from the input FlowFile to determine the format", ""),
GZIP("gzip", "GZIP", ".gz","application/gzip", "application/x-gzip"),
@ -43,12 +38,6 @@ public enum CompressionStrategy implements DescribedValue {
private final String fileExtension;
private final String[] mimeTypes;
public static Optional<CompressionStrategy> findValue(final String value) {
return Arrays.stream(CompressionStrategy.values())
.filter((compressionStrategy -> compressionStrategy.getValue().equalsIgnoreCase(value)))
.findFirst();
}
CompressionStrategy(final String value, final String description, final String fileExtension, final String... mimeTypes) {
this.value = value;
this.description = description;
@ -78,8 +67,4 @@ public enum CompressionStrategy implements DescribedValue {
public String[] getMimeTypes() {
return mimeTypes;
}
public AllowableValue asAllowableValue() {
return new AllowableValue(value, value, description);
}
}

View File

@ -48,107 +48,107 @@ class TestModifyCompression {
@Test
public void testSnappyCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY.getMimeTypes()[0]);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.snappy");
}
@Test
public void testSnappyDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.snappy"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testSnappyHadoopCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.snappy");
}
@Test
public void testSnappyHadoopDecompress() {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.assertNotValid();
}
@Test
public void testSnappyFramedCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.sz");
}
@Test
public void testSnappyFramedDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.sz"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testBzip2DecompressConcatenated() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL);
runner.enqueue(getSamplePath("SampleFileConcat.txt.bz2"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFileConcat.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFileConcat.txt.bz2"); // not updating filename
}
@Test
public void testBzip2DecompressLz4FramedCompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2);
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.bz2"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.lz4");
runner.clearTransferState();
@ -156,27 +156,27 @@ class TestModifyCompression {
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt.lz4");
}
@Test
public void testProperMimeTypeFromBzip2() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-bzip2");
}
@Test
public void testBzip2DecompressWithBothMimeTypes() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
// ensure that we can decompress with a mime type of application/x-bzip2
final Map<String, String> attributes = new HashMap<>();
@ -185,7 +185,7 @@ class TestModifyCompression {
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
@ -198,21 +198,21 @@ class TestModifyCompression {
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt");
}
@Test
public void testGzipDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue());
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP);
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid());
runner.enqueue(getSamplePath("/SampleFile.txt.gz"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
@ -221,83 +221,82 @@ class TestModifyCompression {
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt");
runner.clearTransferState();
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE);
Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.GZIP.getMimeTypes()[0]);
runner.enqueue(getSamplePath("SampleFile.txt.gz"), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testDeflateDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE.getValue());
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE);
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid());
runner.enqueue(getSamplePath("SampleFile.txt.zlib"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
System.err.println(new String(flowFile.toByteArray()));
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testDeflateCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_LEVEL, "6");
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE.getValue());
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE);
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid());
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt.zlib"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.zlib");
}
@Test
public void testFilenameUpdatedOnCompress() throws IOException {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue());
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP);
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid());
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.gz");
}
@Test
public void testDecompressFailure() throws IOException {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP);
byte[] data = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
runner.enqueue(data);
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid());
assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid());
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(ModifyCompression.REL_FAILURE).get(0).assertContentEquals(data);
runner.getFlowFilesForRelationship(ModifyCompression.REL_FAILURE).getFirst().assertContentEquals(data);
final LogMessage errorMessage = runner.getLogger().getErrorMessages().iterator().next();
final LogMessage errorMessage = runner.getLogger().getErrorMessages().getFirst();
assertNotNull(errorMessage);
final Optional<Object> exceptionFound = Arrays.stream(errorMessage.getArgs()).filter(Exception.class::isInstance).findFirst();
@ -306,80 +305,80 @@ class TestModifyCompression {
@Test
public void testLz4FramedCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.lz4");
}
@Test
public void testLz4FramedDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.lz4"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testZstdCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.ZSTD.getMimeTypes()[0]);
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.zst");
}
@Test
public void testZstdDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.zst"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}
@Test
public void testBrotliCompress() throws Exception {
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-brotli");
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.br");
}
@Test
public void testBrotliDecompress() throws Exception {
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI.getValue());
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue());
runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI);
runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED);
runner.enqueue(getSamplePath("SampleFile.txt.br"));
runner.run();
runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst();
flowFile.assertContentEquals(getSamplePath("SampleFile.txt"));
flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt");
}