NIFI-11096 This closes #6889. Added EncodeContent to Processor services definition

- Deprecated Base64EncodeContent in favor of EncodeContent
- Updated EncodeContent to use Commons Codec for Hexadecimal encoding

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2023-01-25 15:43:39 -06:00 committed by Joe Witt
parent 7d99521d4c
commit 5101db2ab7
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
4 changed files with 62 additions and 66 deletions

View File

@ -34,6 +34,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@ -54,6 +55,10 @@ import org.apache.nifi.util.StopWatch;
@Tags({"encode", "base64"})
@CapabilityDescription("Encodes or decodes content to and from base64")
@InputRequirement(Requirement.INPUT_REQUIRED)
@DeprecationNotice(
alternatives = EncodeContent.class,
reason = "EncodeContent supports Base64 and additional encoding schemes"
)
public class Base64EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";

View File

@ -19,10 +19,9 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -33,7 +32,6 @@ import org.apache.commons.codec.binary.Base32OutputStream;
import org.apache.commons.codec.binary.Base64InputStream;
import org.apache.commons.codec.binary.Base64OutputStream;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -42,11 +40,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream;
@ -54,18 +50,16 @@ import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"encode", "decode", "base64", "hex"})
@CapabilityDescription("Encodes the FlowFile content in base64")
@CapabilityDescription("Encode or decode contents using configurable encoding schemes")
public class EncodeContent extends AbstractProcessor {
public static final String ENCODE_MODE = "Encode";
public static final String DECODE_MODE = "Decode";
// List of support encodings.
public static final String BASE64_ENCODING = "base64";
public static final String BASE32_ENCODING = "base32";
public static final String HEX_ENCODING = "hex";
@ -95,21 +89,23 @@ public class EncodeContent extends AbstractProcessor {
.description("Any FlowFile that cannot be encoded or decoded will be routed to failure")
.build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
private static final int BUFFER_SIZE = 8192;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(MODE);
props.add(ENCODING);
this.properties = Collections.unmodifiableList(props);
private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(
MODE,
ENCODING
)
);
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(rels);
}
private static final Set<Relationship> relationships = Collections.unmodifiableSet(
new LinkedHashSet<>(
Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)
)
);
@Override
public Set<Relationship> getRelationships() {
@ -128,45 +124,37 @@ public class EncodeContent extends AbstractProcessor {
return;
}
final ComponentLog logger = getLogger();
final boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
final String encoding = context.getProperty(ENCODING).getValue();
final StreamCallback callback;
boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE);
String encoding = context.getProperty(ENCODING).getValue();
StreamCallback encoder = null;
// Select the encoder/decoder to use
if (encode) {
if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
encoder = new EncodeBase64();
callback = new EncodeBase64();
} else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
encoder = new EncodeBase32();
} else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
encoder = new EncodeHex();
callback = new EncodeBase32();
} else {
callback = new EncodeHex();
}
} else {
if (encoding.equalsIgnoreCase(BASE64_ENCODING)) {
encoder = new DecodeBase64();
callback = new DecodeBase64();
} else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) {
encoder = new DecodeBase32();
} else if (encoding.equalsIgnoreCase(HEX_ENCODING)) {
encoder = new DecodeHex();
callback = new DecodeBase32();
} else {
callback = new DecodeHex();
}
}
if (encoder == null) {
logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode", encoding});
return;
}
try {
final StopWatch stopWatch = new StopWatch(true);
flowFile = session.write(flowFile, encoder);
flowFile = session.write(flowFile, callback);
logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile});
getLogger().info("{} completed {}", encode ? "Encoding" : "Decoding", flowFile);
session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e});
} catch (final Exception e) {
getLogger().error("{} failed {}", encode ? "Encoding" : "Decoding", flowFile, e);
session.transfer(flowFile, REL_FAILURE);
}
}
@ -174,7 +162,7 @@ public class EncodeContent extends AbstractProcessor {
private static class EncodeBase64 implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base64OutputStream bos = new Base64OutputStream(out)) {
StreamUtils.copy(in, bos);
}
@ -184,7 +172,7 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeBase64 implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) {
StreamUtils.copy(bis, out);
}
@ -194,7 +182,7 @@ public class EncodeContent extends AbstractProcessor {
private static class EncodeBase32 implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base32OutputStream bos = new Base32OutputStream(out)) {
StreamUtils.copy(in, bos);
}
@ -204,19 +192,19 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeBase32 implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) {
StreamUtils.copy(bis, out);
}
}
}
private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
private static class EncodeHex implements StreamCallback {
private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
int len;
byte[] inBuf = new byte[8192];
byte[] outBuf = new byte[inBuf.length * 2];
@ -234,9 +222,9 @@ public class EncodeContent extends AbstractProcessor {
private static class DecodeHex implements StreamCallback {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
public void process(final InputStream in, final OutputStream out) throws IOException {
int len;
byte[] inBuf = new byte[8192];
byte[] inBuf = new byte[BUFFER_SIZE];
Hex h = new Hex();
while ((len = in.read(inBuf)) > 0) {
// If the input buffer is of odd length, try to get another byte
@ -252,8 +240,8 @@ public class EncodeContent extends AbstractProcessor {
byte[] slice = Arrays.copyOfRange(inBuf, 0, len);
try {
out.write(h.decode(slice));
} catch (DecoderException ex) {
throw new IOException(ex);
} catch (final DecoderException e) {
throw new IOException("Hexadecimal decoding failed", e);
}
}
out.flush();

View File

@ -29,6 +29,7 @@ org.apache.nifi.processors.standard.DetectDuplicate
org.apache.nifi.processors.standard.DeduplicateRecord
org.apache.nifi.processors.standard.DistributeLoad
org.apache.nifi.processors.standard.DuplicateFlowFile
org.apache.nifi.processors.standard.EncodeContent
org.apache.nifi.processors.standard.EncryptContent
org.apache.nifi.processors.standard.EnforceOrder
org.apache.nifi.processors.standard.EvaluateJsonPath

View File

@ -16,8 +16,8 @@
*/
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.nifi.util.MockFlowFile;
@ -27,6 +27,8 @@ import org.junit.jupiter.api.Test;
public class TestEncodeContent {
private static final Path FILE_PATH = Paths.get("src/test/resources/hello.txt");
@Test
public void testBase64RoundTrip() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
@ -34,7 +36,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@ -50,7 +52,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
flowFile.assertContentEquals(FILE_PATH);
}
@Test
@ -60,7 +62,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@ -68,7 +70,7 @@ public class TestEncodeContent {
}
@Test
public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException {
public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() {
final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent());
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
@ -88,7 +90,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@ -104,7 +106,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
flowFile.assertContentEquals(FILE_PATH);
}
@Test
@ -114,7 +116,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@ -128,7 +130,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();
@ -144,7 +146,7 @@ public class TestEncodeContent {
testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1);
flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0);
flowFile.assertContentEquals(new File("src/test/resources/hello.txt"));
flowFile.assertContentEquals(FILE_PATH);
}
@Test
@ -154,7 +156,7 @@ public class TestEncodeContent {
testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE);
testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING);
testRunner.enqueue(Paths.get("src/test/resources/hello.txt"));
testRunner.enqueue(FILE_PATH);
testRunner.clearTransferState();
testRunner.run();