From 5101db2ab736bab4f08acce90f7880f0cb06de78 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 25 Jan 2023 15:43:39 -0600 Subject: [PATCH] NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition - Deprecated Base64EncodeContent in favor of EncodeContent - Updated EncodeContent to use Commons Codec for Hexadecimal encoding Signed-off-by: Joe Witt --- .../standard/Base64EncodeContent.java | 5 + .../processors/standard/EncodeContent.java | 98 ++++++++----------- .../org.apache.nifi.processor.Processor | 1 + .../standard/TestEncodeContent.java | 24 ++--- 4 files changed, 62 insertions(+), 66 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java index cdade756d7..74f06e1dce 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java @@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.DeprecationNotice; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -54,6 +55,10 @@ import org.apache.nifi.util.StopWatch; @Tags({"encode", "base64"}) @CapabilityDescription("Encodes or decodes content to and from base64") @InputRequirement(Requirement.INPUT_REQUIRED) +@DeprecationNotice( + alternatives = EncodeContent.class, + reason = "EncodeContent supports Base64 and additional encoding schemes" +) public class Base64EncodeContent extends AbstractProcessor { public static final String ENCODE_MODE = "Encode"; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java index 469e4ecd92..869605b7d2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java @@ -19,10 +19,9 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -33,7 +32,6 @@ import org.apache.commons.codec.binary.Base32OutputStream; import org.apache.commons.codec.binary.Base64InputStream; import org.apache.commons.codec.binary.Base64OutputStream; import org.apache.commons.codec.binary.Hex; -import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; @@ -42,11 +40,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream; @@ -54,18 +50,16 @@ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -@EventDriven @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"encode", "decode", "base64", "hex"}) -@CapabilityDescription("Encodes the FlowFile content in base64") +@CapabilityDescription("Encode or decode contents using configurable encoding schemes") public class EncodeContent extends AbstractProcessor { public static final String ENCODE_MODE = "Encode"; public static final String DECODE_MODE = "Decode"; - // List of support encodings. public static final String BASE64_ENCODING = "base64"; public static final String BASE32_ENCODING = "base32"; public static final String HEX_ENCODING = "hex"; @@ -95,21 +89,23 @@ public class EncodeContent extends AbstractProcessor { .description("Any FlowFile that cannot be encoded or decoded will be routed to failure") .build(); - private List properties; - private Set relationships; + private static final int BUFFER_SIZE = 8192; - @Override - protected void init(final ProcessorInitializationContext context) { - final List props = new ArrayList<>(); - props.add(MODE); - props.add(ENCODING); - this.properties = Collections.unmodifiableList(props); + private static final List properties = Collections.unmodifiableList( + Arrays.asList( + MODE, + ENCODING + ) + ); - final Set rels = new HashSet<>(); - rels.add(REL_SUCCESS); - rels.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(rels); - } + private static final Set relationships = Collections.unmodifiableSet( + new LinkedHashSet<>( + Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ) + ) + ); @Override public Set getRelationships() { @@ -128,45 +124,37 @@ public class EncodeContent extends AbstractProcessor { return; } - final ComponentLog logger = getLogger(); + final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); + final String encoding = context.getProperty(ENCODING).getValue(); + final StreamCallback callback; - boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); - String encoding = context.getProperty(ENCODING).getValue(); - StreamCallback encoder = null; - - // Select the encoder/decoder to use if (encode) { if (encoding.equalsIgnoreCase(BASE64_ENCODING)) { - encoder = new EncodeBase64(); + callback = new EncodeBase64(); } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) { - encoder = new EncodeBase32(); - } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) { - encoder = new EncodeHex(); + callback = new EncodeBase32(); + } else { + callback = new EncodeHex(); } } else { if (encoding.equalsIgnoreCase(BASE64_ENCODING)) { - encoder = new DecodeBase64(); + callback = new DecodeBase64(); } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) { - encoder = new DecodeBase32(); - } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) { - encoder = new DecodeHex(); + callback = new DecodeBase32(); + } else { + callback = new DecodeHex(); } } - if (encoder == null) { - logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode", encoding}); - return; - } - try { final StopWatch stopWatch = new StopWatch(true); - flowFile = session.write(flowFile, encoder); + flowFile = session.write(flowFile, callback); - logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); + getLogger().info("{} completed {}", encode ? "Encoding" : "Decoding", flowFile); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); - } catch (Exception e) { - logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); + } catch (final Exception e) { + getLogger().error("{} failed {}", encode ? "Encoding" : "Decoding", flowFile, e); session.transfer(flowFile, REL_FAILURE); } } @@ -174,7 +162,7 @@ public class EncodeContent extends AbstractProcessor { private static class EncodeBase64 implements StreamCallback { @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { try (Base64OutputStream bos = new Base64OutputStream(out)) { StreamUtils.copy(in, bos); } @@ -184,7 +172,7 @@ public class EncodeContent extends AbstractProcessor { private static class DecodeBase64 implements StreamCallback { @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { StreamUtils.copy(bis, out); } @@ -194,7 +182,7 @@ public class EncodeContent extends AbstractProcessor { private static class EncodeBase32 implements StreamCallback { @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { try (Base32OutputStream bos = new Base32OutputStream(out)) { StreamUtils.copy(in, bos); } @@ -204,19 +192,19 @@ public class EncodeContent extends AbstractProcessor { private static class DecodeBase32 implements StreamCallback { @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) { StreamUtils.copy(bis, out); } } } - private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; - private static class EncodeHex implements StreamCallback { + private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { int len; byte[] inBuf = new byte[8192]; byte[] outBuf = new byte[inBuf.length * 2]; @@ -234,9 +222,9 @@ public class EncodeContent extends AbstractProcessor { private static class DecodeHex implements StreamCallback { @Override - public void process(InputStream in, OutputStream out) throws IOException { + public void process(final InputStream in, final OutputStream out) throws IOException { int len; - byte[] inBuf = new byte[8192]; + byte[] inBuf = new byte[BUFFER_SIZE]; Hex h = new Hex(); while ((len = in.read(inBuf)) > 0) { // If the input buffer is of odd length, try to get another byte @@ -252,8 +240,8 @@ public class EncodeContent extends AbstractProcessor { byte[] slice = Arrays.copyOfRange(inBuf, 0, len); try { out.write(h.decode(slice)); - } catch (DecoderException ex) { - throw new IOException(ex); + } catch (final DecoderException e) { + throw new IOException("Hexadecimal decoding failed", e); } } out.flush(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index e0db844bb5..0ad3ce3fe8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -29,6 +29,7 @@ org.apache.nifi.processors.standard.DetectDuplicate org.apache.nifi.processors.standard.DeduplicateRecord org.apache.nifi.processors.standard.DistributeLoad org.apache.nifi.processors.standard.DuplicateFlowFile +org.apache.nifi.processors.standard.EncodeContent org.apache.nifi.processors.standard.EncryptContent org.apache.nifi.processors.standard.EnforceOrder org.apache.nifi.processors.standard.EvaluateJsonPath diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java index a0fcb28588..b8da70c754 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java @@ -16,8 +16,8 @@ */ package org.apache.nifi.processors.standard; -import java.io.File; import java.io.IOException; +import java.nio.file.Path; import java.nio.file.Paths; import org.apache.nifi.util.MockFlowFile; @@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test; public class TestEncodeContent { + private static final Path FILE_PATH = Paths.get("src/test/resources/hello.txt"); + @Test public void testBase64RoundTrip() throws IOException { final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); @@ -34,7 +36,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run(); @@ -50,7 +52,7 @@ public class TestEncodeContent { testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + flowFile.assertContentEquals(FILE_PATH); } @Test @@ -60,7 +62,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run(); @@ -68,7 +70,7 @@ public class TestEncodeContent { } @Test - public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException { + public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() { final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); @@ -88,7 +90,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run(); @@ -104,7 +106,7 @@ public class TestEncodeContent { testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + flowFile.assertContentEquals(FILE_PATH); } @Test @@ -114,7 +116,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run(); @@ -128,7 +130,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run(); @@ -144,7 +146,7 @@ public class TestEncodeContent { testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); - flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + flowFile.assertContentEquals(FILE_PATH); } @Test @@ -154,7 +156,7 @@ public class TestEncodeContent { testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING); - testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.enqueue(FILE_PATH); testRunner.clearTransferState(); testRunner.run();