From 655baec4cbd6546d129c505cd5be510acadb52f3 Mon Sep 17 00:00:00 2001 From: EndzeitBegins <16666115+EndzeitBegins@users.noreply.github.com> Date: Sun, 7 Jan 2024 20:27:11 +0100 Subject: [PATCH] NIFI-12578 Updated nifi-compress-bundle using current API methods This closes #8215 Signed-off-by: David Handermann --- .../compress/ModifyCompression.java | 217 ++++++++---------- .../property/CompressionStrategy.java | 15 -- .../compress/TestModifyCompression.java | 139 ++++++----- 3 files changed, 164 insertions(+), 207 deletions(-) diff --git a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/ModifyCompression.java b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/ModifyCompression.java index f4a3653e03..884357fcfd 100644 --- a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/ModifyCompression.java +++ b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/ModifyCompression.java @@ -67,13 +67,11 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -100,19 +98,8 @@ public class ModifyCompression extends AbstractProcessor { .name("Input Compression Strategy") .displayName("Input Compression Strategy") .description("The strategy to use for decompressing input FlowFiles") - .allowableValues(CompressionStrategy.NONE.asAllowableValue(), - CompressionStrategy.MIME_TYPE_ATTRIBUTE.asAllowableValue(), - CompressionStrategy.GZIP.asAllowableValue(), - CompressionStrategy.DEFLATE.asAllowableValue(), - CompressionStrategy.BZIP2.asAllowableValue(), - CompressionStrategy.XZ_LZMA2.asAllowableValue(), - CompressionStrategy.LZMA.asAllowableValue(), - CompressionStrategy.SNAPPY.asAllowableValue(), - CompressionStrategy.SNAPPY_FRAMED.asAllowableValue(), - CompressionStrategy.LZ4_FRAMED.asAllowableValue(), - CompressionStrategy.ZSTD.asAllowableValue(), - CompressionStrategy.BROTLI.asAllowableValue()) - .defaultValue(CompressionStrategy.NONE.getValue()) + .allowableValues(EnumSet.complementOf(EnumSet.of(CompressionStrategy.SNAPPY_HADOOP))) + .defaultValue(CompressionStrategy.NONE) .required(true) .build(); @@ -120,19 +107,8 @@ public class ModifyCompression extends AbstractProcessor { .name("Output Compression Strategy") .name("Output Compression Strategy") .description("The strategy to use for compressing output FlowFiles") - .allowableValues(CompressionStrategy.NONE.asAllowableValue(), - CompressionStrategy.GZIP.asAllowableValue(), - CompressionStrategy.DEFLATE.asAllowableValue(), - CompressionStrategy.BZIP2.asAllowableValue(), - CompressionStrategy.XZ_LZMA2.asAllowableValue(), - CompressionStrategy.LZMA.asAllowableValue(), - CompressionStrategy.SNAPPY.asAllowableValue(), - CompressionStrategy.SNAPPY_HADOOP.asAllowableValue(), - CompressionStrategy.SNAPPY_FRAMED.asAllowableValue(), - CompressionStrategy.LZ4_FRAMED.asAllowableValue(), - CompressionStrategy.ZSTD.asAllowableValue(), - CompressionStrategy.BROTLI.asAllowableValue()) - .defaultValue(CompressionStrategy.NONE.getValue()) + .allowableValues(EnumSet.complementOf(EnumSet.of(CompressionStrategy.MIME_TYPE_ATTRIBUTE))) + .defaultValue(CompressionStrategy.NONE) .required(true) .build(); @@ -160,7 +136,7 @@ public class ModifyCompression extends AbstractProcessor { .description("Processing strategy for filename attribute on output FlowFiles") .required(true) .allowableValues(FilenameStrategy.class) - .defaultValue(FilenameStrategy.UPDATED.getValue()) + .defaultValue(FilenameStrategy.UPDATED) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() @@ -173,17 +149,14 @@ public class ModifyCompression extends AbstractProcessor { .description("FlowFiles will be transferred to the failure relationship on compression modification errors") .build(); - private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( + private static final List PROPERTIES = List.of( INPUT_COMPRESSION_STRATEGY, OUTPUT_COMPRESSION_STRATEGY, OUTPUT_COMPRESSION_LEVEL, OUTPUT_FILENAME_STRATEGY - )); + ); - private static final Set RELATIONSHIPS = Collections.unmodifiableSet(new LinkedHashSet<>(Arrays.asList( - REL_SUCCESS, - REL_FAILURE - ))); + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE); private static final Map compressionFormatMimeTypeMap; @@ -222,7 +195,7 @@ public class ModifyCompression extends AbstractProcessor { } final CompressionStrategy inputCompressionStrategy; - final CompressionStrategy configuredInputCompressionStrategy = getCompressionStrategy(context.getProperty(INPUT_COMPRESSION_STRATEGY).getValue()); + final CompressionStrategy configuredInputCompressionStrategy = context.getProperty(INPUT_COMPRESSION_STRATEGY).asDescribedValue(CompressionStrategy.class); if (CompressionStrategy.MIME_TYPE_ATTRIBUTE == configuredInputCompressionStrategy) { final String mimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); if (mimeType == null) { @@ -241,7 +214,7 @@ public class ModifyCompression extends AbstractProcessor { inputCompressionStrategy = configuredInputCompressionStrategy; } - final CompressionStrategy outputCompressionStrategy = getCompressionStrategy(context.getProperty(OUTPUT_COMPRESSION_STRATEGY).getValue()); + final CompressionStrategy outputCompressionStrategy = context.getProperty(OUTPUT_COMPRESSION_STRATEGY).asDescribedValue(CompressionStrategy.class); final AtomicReference mimeTypeRef = new AtomicReference<>(null); final StopWatch stopWatch = new StopWatch(true); final long inputFileSize = flowFile.getSize(); @@ -284,40 +257,34 @@ public class ModifyCompression extends AbstractProcessor { } private InputStream getCompressionInputStream(final CompressionStrategy compressionFormat, final InputStream parentInputStream) throws IOException { - if (CompressionStrategy.LZMA == compressionFormat) { - return new LzmaInputStream(parentInputStream, new Decoder()); - } else if (CompressionStrategy.XZ_LZMA2 == compressionFormat) { - return new XZInputStream(parentInputStream); - } else if (CompressionStrategy.BZIP2 == compressionFormat) { - // need this two-arg constructor to support concatenated streams - return new BZip2CompressorInputStream(parentInputStream, true); - } else if (CompressionStrategy.GZIP == compressionFormat) { - return new GzipCompressorInputStream(parentInputStream, true); - } else if (CompressionStrategy.DEFLATE == compressionFormat) { - return new InflaterInputStream(parentInputStream); - } else if (CompressionStrategy.SNAPPY == compressionFormat) { - return new SnappyInputStream(parentInputStream); - } else if (CompressionStrategy.SNAPPY_HADOOP == compressionFormat) { - throw new IOException("Cannot decompress snappy-hadoop"); - } else if (CompressionStrategy.SNAPPY_FRAMED == compressionFormat) { - return new SnappyFramedInputStream(parentInputStream); - } else if (CompressionStrategy.LZ4_FRAMED == compressionFormat) { - return new FramedLZ4CompressorInputStream(parentInputStream, true); - } else if (CompressionStrategy.ZSTD == compressionFormat) { - return new ZstdCompressorInputStream(parentInputStream); - } else if (CompressionStrategy.BROTLI == compressionFormat) { - Brotli4jLoader.ensureAvailability(); - return new BrotliInputStream(parentInputStream); - } else if (CompressionStrategy.NONE == compressionFormat) { - return parentInputStream; - } else { - final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); - try { - return new CompressorStreamFactory().createCompressorInputStream(compressorStreamFormat, parentInputStream); - } catch (final CompressorException e) { - throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + return switch (compressionFormat) { + case LZMA -> new LzmaInputStream(parentInputStream, new Decoder()); + case XZ_LZMA2 -> new XZInputStream(parentInputStream); + case BZIP2 -> { + // need this two-arg constructor to support concatenated streams + yield new BZip2CompressorInputStream(parentInputStream, true); } - } + case GZIP -> new GzipCompressorInputStream(parentInputStream, true); + case DEFLATE -> new InflaterInputStream(parentInputStream); + case SNAPPY -> new SnappyInputStream(parentInputStream); + case SNAPPY_HADOOP -> throw new IOException("Cannot decompress snappy-hadoop"); + case SNAPPY_FRAMED -> new SnappyFramedInputStream(parentInputStream); + case LZ4_FRAMED -> new FramedLZ4CompressorInputStream(parentInputStream, true); + case ZSTD -> new ZstdCompressorInputStream(parentInputStream); + case BROTLI -> { + Brotli4jLoader.ensureAvailability(); + yield new BrotliInputStream(parentInputStream); + } + case NONE -> parentInputStream; + default -> { + final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); + try { + yield new CompressorStreamFactory().createCompressorInputStream(compressorStreamFormat, parentInputStream); + } catch (final CompressorException e) { + throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + } + } + }; } private OutputStream getCompressionOutputStream( @@ -327,54 +294,65 @@ public class ModifyCompression extends AbstractProcessor { final OutputStream parentOutputStream ) throws IOException { final OutputStream compressionOut; - if (CompressionStrategy.GZIP == compressionFormat) { - compressionOut = new GZIPOutputStream(parentOutputStream, compressionLevel); - mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]); - } else if (CompressionStrategy.DEFLATE == compressionFormat) { - compressionOut = new DeflaterOutputStream(parentOutputStream, new Deflater(compressionLevel)); - mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]); - } else if (CompressionStrategy.LZMA == compressionFormat) { - compressionOut = new LzmaOutputStream.Builder(parentOutputStream).build(); - mimeTypeRef.set(CompressionStrategy.LZMA.getMimeTypes()[0]); - } else if (CompressionStrategy.XZ_LZMA2 == compressionFormat) { - compressionOut = new XZOutputStream(parentOutputStream, new LZMA2Options(compressionLevel)); - mimeTypeRef.set(CompressionStrategy.XZ_LZMA2.getMimeTypes()[0]); - } else if (CompressionStrategy.SNAPPY == compressionFormat) { - compressionOut = new SnappyOutputStream(parentOutputStream); - mimeTypeRef.set(CompressionStrategy.SNAPPY.getMimeTypes()[0]); - } else if (CompressionStrategy.SNAPPY_HADOOP == compressionFormat) { - compressionOut = new SnappyHadoopCompatibleOutputStream(parentOutputStream); - mimeTypeRef.set(CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]); - } else if (CompressionStrategy.SNAPPY_FRAMED == compressionFormat) { - compressionOut = new SnappyFramedOutputStream(parentOutputStream); - mimeTypeRef.set(CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]); - } else if (CompressionStrategy.LZ4_FRAMED == compressionFormat) { - final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); - try { - compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream); - } catch (final CompressorException e) { - throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + switch (compressionFormat) { + case GZIP -> { + compressionOut = new GZIPOutputStream(parentOutputStream, compressionLevel); + mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]); } - mimeTypeRef.set(CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]); - } else if (CompressionStrategy.ZSTD == compressionFormat) { - final int outputCompressionLevel = compressionLevel * 2; - compressionOut = new ZstdCompressorOutputStream(parentOutputStream, outputCompressionLevel); - mimeTypeRef.set(CompressionStrategy.ZSTD.getMimeTypes()[0]); - } else if (CompressionStrategy.BROTLI == compressionFormat) { - Brotli4jLoader.ensureAvailability(); - Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel); - compressionOut = new BrotliOutputStream(parentOutputStream, params); - mimeTypeRef.set(CompressionStrategy.BROTLI.getMimeTypes()[0]); - } else if (CompressionStrategy.BZIP2 == compressionFormat) { - final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); - try { - compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream); - } catch (final CompressorException e) { - throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + case DEFLATE -> { + compressionOut = new DeflaterOutputStream(parentOutputStream, new Deflater(compressionLevel)); + mimeTypeRef.set(CompressionStrategy.GZIP.getMimeTypes()[0]); } - mimeTypeRef.set(CompressionStrategy.BZIP2.getMimeTypes()[0]); - } else { - compressionOut = parentOutputStream; + case LZMA -> { + compressionOut = new LzmaOutputStream.Builder(parentOutputStream).build(); + mimeTypeRef.set(CompressionStrategy.LZMA.getMimeTypes()[0]); + } + case XZ_LZMA2 -> { + compressionOut = new XZOutputStream(parentOutputStream, new LZMA2Options(compressionLevel)); + mimeTypeRef.set(CompressionStrategy.XZ_LZMA2.getMimeTypes()[0]); + } + case SNAPPY -> { + compressionOut = new SnappyOutputStream(parentOutputStream); + mimeTypeRef.set(CompressionStrategy.SNAPPY.getMimeTypes()[0]); + } + case SNAPPY_HADOOP -> { + compressionOut = new SnappyHadoopCompatibleOutputStream(parentOutputStream); + mimeTypeRef.set(CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]); + } + case SNAPPY_FRAMED -> { + compressionOut = new SnappyFramedOutputStream(parentOutputStream); + mimeTypeRef.set(CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]); + } + case LZ4_FRAMED -> { + final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); + try { + compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream); + } catch (final CompressorException e) { + throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + } + mimeTypeRef.set(CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]); + } + case ZSTD -> { + final int outputCompressionLevel = compressionLevel * 2; + compressionOut = new ZstdCompressorOutputStream(parentOutputStream, outputCompressionLevel); + mimeTypeRef.set(CompressionStrategy.ZSTD.getMimeTypes()[0]); + } + case BROTLI -> { + Brotli4jLoader.ensureAvailability(); + Encoder.Parameters params = new Encoder.Parameters().setQuality(compressionLevel); + compressionOut = new BrotliOutputStream(parentOutputStream, params); + mimeTypeRef.set(CompressionStrategy.BROTLI.getMimeTypes()[0]); + } + case BZIP2 -> { + final String compressorStreamFormat = compressionFormat.getValue().toLowerCase(); + try { + compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressorStreamFormat, parentOutputStream); + } catch (final CompressorException e) { + throw new IOException(String.format("Compressor Stream Format [%s] creation failed", compressorStreamFormat), e); + } + mimeTypeRef.set(CompressionStrategy.BZIP2.getMimeTypes()[0]); + } + case null, default -> compressionOut = parentOutputStream; } return compressionOut; } @@ -391,9 +369,4 @@ public class ModifyCompression extends AbstractProcessor { } return truncatedFilename + outputCompressionStrategy.getFileExtension(); } - - private CompressionStrategy getCompressionStrategy(final String propertyValue) { - final Optional compressionInfo = CompressionStrategy.findValue(propertyValue); - return compressionInfo.orElseThrow(() -> new IllegalArgumentException(String.format("Compression Format [%s] not supported", propertyValue))); - } } diff --git a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/property/CompressionStrategy.java b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/property/CompressionStrategy.java index 8459c49b36..ec37a246d6 100644 --- a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/property/CompressionStrategy.java +++ b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/main/java/org/apache/nifi/processors/compress/property/CompressionStrategy.java @@ -16,14 +16,9 @@ */ package org.apache.nifi.processors.compress.property; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.DescribedValue; -import java.util.Arrays; -import java.util.Optional; - public enum CompressionStrategy implements DescribedValue { - NONE("no compression", "No Compression", ""), MIME_TYPE_ATTRIBUTE("use mime.type attribute", "Use the [mime.type] attribute from the input FlowFile to determine the format", ""), GZIP("gzip", "GZIP", ".gz","application/gzip", "application/x-gzip"), @@ -43,12 +38,6 @@ public enum CompressionStrategy implements DescribedValue { private final String fileExtension; private final String[] mimeTypes; - public static Optional findValue(final String value) { - return Arrays.stream(CompressionStrategy.values()) - .filter((compressionStrategy -> compressionStrategy.getValue().equalsIgnoreCase(value))) - .findFirst(); - } - CompressionStrategy(final String value, final String description, final String fileExtension, final String... mimeTypes) { this.value = value; this.description = description; @@ -78,8 +67,4 @@ public enum CompressionStrategy implements DescribedValue { public String[] getMimeTypes() { return mimeTypes; } - - public AllowableValue asAllowableValue() { - return new AllowableValue(value, value, description); - } } diff --git a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/test/java/org/apache/nifi/processors/compress/TestModifyCompression.java b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/test/java/org/apache/nifi/processors/compress/TestModifyCompression.java index 0f4aabddea..e98f381924 100644 --- a/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/test/java/org/apache/nifi/processors/compress/TestModifyCompression.java +++ b/nifi-nar-bundles/nifi-compress-bundle/nifi-compress-processors/src/test/java/org/apache/nifi/processors/compress/TestModifyCompression.java @@ -48,107 +48,107 @@ class TestModifyCompression { @Test public void testSnappyCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY.getMimeTypes()[0]); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.snappy"); } @Test public void testSnappyDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.snappy")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } @Test public void testSnappyHadoopCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY_HADOOP.getMimeTypes()[0]); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.snappy"); } @Test public void testSnappyHadoopDecompress() { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_HADOOP); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.assertNotValid(); } @Test public void testSnappyFramedCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.SNAPPY_FRAMED.getMimeTypes()[0]); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.sz"); } @Test public void testSnappyFramedDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.SNAPPY_FRAMED); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.sz")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } @Test public void testBzip2DecompressConcatenated() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL); runner.enqueue(getSamplePath("SampleFileConcat.txt.bz2")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFileConcat.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFileConcat.txt.bz2"); // not updating filename } @Test public void testBzip2DecompressLz4FramedCompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.bz2")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.lz4"); runner.clearTransferState(); @@ -156,27 +156,27 @@ class TestModifyCompression { runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt.lz4"); } @Test public void testProperMimeTypeFromBzip2() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BZIP2); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.ORIGINAL); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-bzip2"); } @Test public void testBzip2DecompressWithBothMimeTypes() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); // ensure that we can decompress with a mime type of application/x-bzip2 final Map attributes = new HashMap<>(); @@ -185,7 +185,7 @@ class TestModifyCompression { runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); @@ -198,21 +198,21 @@ class TestModifyCompression { runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt"); } @Test public void testGzipDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue()); - assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP); + assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid()); runner.enqueue(getSamplePath("/SampleFile.txt.gz")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); @@ -221,83 +221,82 @@ class TestModifyCompression { runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile1.txt"); runner.clearTransferState(); - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.MIME_TYPE_ATTRIBUTE); Map attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.GZIP.getMimeTypes()[0]); runner.enqueue(getSamplePath("SampleFile.txt.gz"), attributes); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } @Test public void testDeflateDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE.getValue()); - assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE); + assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid()); runner.enqueue(getSamplePath("SampleFile.txt.zlib")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); System.err.println(new String(flowFile.toByteArray())); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } - @Test public void testDeflateCompress() throws Exception { runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_LEVEL, "6"); - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE.getValue()); - assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.DEFLATE); + assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid()); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt.zlib")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.zlib"); } @Test public void testFilenameUpdatedOnCompress() throws IOException { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue()); - assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP); + assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid()); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.gz"); } @Test public void testDecompressFailure() throws IOException { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.GZIP); byte[] data = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; runner.enqueue(data); - assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()).isValid()); + assertTrue(runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED).isValid()); runner.run(); runner.assertQueueEmpty(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_FAILURE, 1); - runner.getFlowFilesForRelationship(ModifyCompression.REL_FAILURE).get(0).assertContentEquals(data); + runner.getFlowFilesForRelationship(ModifyCompression.REL_FAILURE).getFirst().assertContentEquals(data); - final LogMessage errorMessage = runner.getLogger().getErrorMessages().iterator().next(); + final LogMessage errorMessage = runner.getLogger().getErrorMessages().getFirst(); assertNotNull(errorMessage); final Optional exceptionFound = Arrays.stream(errorMessage.getArgs()).filter(Exception.class::isInstance).findFirst(); @@ -306,80 +305,80 @@ class TestModifyCompression { @Test public void testLz4FramedCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.LZ4_FRAMED.getMimeTypes()[0]); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.lz4"); } @Test public void testLz4FramedDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.LZ4_FRAMED); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.lz4")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } @Test public void testZstdCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), CompressionStrategy.ZSTD.getMimeTypes()[0]); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.zst"); } @Test public void testZstdDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.ZSTD); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.zst")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); } @Test public void testBrotliCompress() throws Exception { - runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.OUTPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/x-brotli"); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt.br"); } @Test public void testBrotliDecompress() throws Exception { - runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI.getValue()); - runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED.getValue()); + runner.setProperty(ModifyCompression.INPUT_COMPRESSION_STRATEGY, CompressionStrategy.BROTLI); + runner.setProperty(ModifyCompression.OUTPUT_FILENAME_STRATEGY, FilenameStrategy.UPDATED); runner.enqueue(getSamplePath("SampleFile.txt.br")); runner.run(); runner.assertAllFlowFilesTransferred(ModifyCompression.REL_SUCCESS, 1); - MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).get(0); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ModifyCompression.REL_SUCCESS).getFirst(); flowFile.assertContentEquals(getSamplePath("SampleFile.txt")); flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), "SampleFile.txt"); }