From b2935410a2bcae73848e817578f2e777df517a45 Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Fri, 20 Feb 2015 15:50:46 +0000 Subject: [PATCH] NIFI-377: Add EncodeContent processor --- .../processors/standard/EncodeContent.java | 248 ++++++++++++++++++ .../util/ValidatingBase32InputStream.java | 78 ++++++ .../standard/TestEncodeContent.java | 164 ++++++++++++ 3 files changed, 490 insertions(+) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java new file mode 100644 index 0000000000..2465b56e07 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EncodeContent.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Base32InputStream; +import org.apache.commons.codec.binary.Base32OutputStream; + +import org.apache.commons.codec.binary.Base64InputStream; +import org.apache.commons.codec.binary.Base64OutputStream; +import org.apache.commons.codec.binary.Hex; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.standard.util.ValidatingBase32InputStream; +import org.apache.nifi.processors.standard.util.ValidatingBase64InputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"experimental", "encode", "decode", "base64", "hex"}) +@CapabilityDescription("Encodes the FlowFile content in base64") +public class EncodeContent extends AbstractProcessor { + + public static final String ENCODE_MODE = "Encode"; + public static final String DECODE_MODE = "Decode"; + + // List of support encodings. + public static final String BASE64_ENCODING = "base64"; + public static final String BASE32_ENCODING = "base32"; + public static final String HEX_ENCODING = "hex"; + + public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder() + .name("Mode") + .description("Specifies whether the content should be encoded or decoded") + .required(true) + .allowableValues(ENCODE_MODE, DECODE_MODE) + .defaultValue(ENCODE_MODE) + .build(); + + public static final PropertyDescriptor ENCODING = new PropertyDescriptor.Builder() + .name("Encoding") + .description("Specifies the type of encoding used") + .required(true) + .allowableValues(BASE64_ENCODING, BASE32_ENCODING, HEX_ENCODING) + .defaultValue(BASE64_ENCODING) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Any FlowFile that is successfully encoded or decoded will be routed to success").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be encoded or decoded will be routed to failure").build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List props = new ArrayList<>(); + props.add(MODE); + props.add(ENCODING); + this.properties = Collections.unmodifiableList(props); + + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(rels); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final ProcessorLog logger = getLogger(); + + boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ENCODE_MODE); + String encoding = context.getProperty(ENCODING).getValue(); + StreamCallback encoder = null; + + // Select the encoder/decoder to use + if (encode) { + if (encoding.equalsIgnoreCase(BASE64_ENCODING)) { + encoder = new EncodeBase64(); + } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) { + encoder = new EncodeBase32(); + } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) { + encoder = new EncodeHex(); + } + } else { + if (encoding.equalsIgnoreCase(BASE64_ENCODING)) { + encoder = new DecodeBase64(); + } else if (encoding.equalsIgnoreCase(BASE32_ENCODING)) { + encoder = new DecodeBase32(); + } else if (encoding.equalsIgnoreCase(HEX_ENCODING)) { + encoder = new DecodeHex(); + } + } + + if (encoder == null) { + logger.warn("Unknown operation: {} {}", new Object[]{encode ? "encode" : "decode", encoding}); + return; + } + + try { + final StopWatch stopWatch = new StopWatch(true); + flowFile = session.write(flowFile, encoder); + + logger.info("Successfully {} {}", new Object[]{encode ? "encoded" : "decoded", flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + logger.error("Failed to {} {} due to {}", new Object[]{encode ? "encode" : "decode", flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + } + } + + private class EncodeBase64 implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64OutputStream bos = new Base64OutputStream(out)) { + StreamUtils.copy(in, bos); + } + } + } + + private class DecodeBase64 implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base64InputStream bis = new Base64InputStream(new ValidatingBase64InputStream(in))) { + StreamUtils.copy(bis, out); + } + } + } + + private class EncodeBase32 implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base32OutputStream bos = new Base32OutputStream(out)) { + StreamUtils.copy(in, bos); + } + } + } + + private class DecodeBase32 implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try (Base32InputStream bis = new Base32InputStream(new ValidatingBase32InputStream(in))) { + StreamUtils.copy(bis, out); + } + } + } + + private static final byte[] HEX_CHARS = {'0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'}; + + private class EncodeHex implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + int len; + byte[] inBuf = new byte[8192]; + byte[] outBuf = new byte[inBuf.length * 2]; + while ((len = in.read(inBuf)) > 0) { + for (int i = 0; i < len; i++) { + outBuf[i*2] = HEX_CHARS[(inBuf[i] & 0xF0) >>> 4]; + outBuf[i*2 +1] = HEX_CHARS[inBuf[i] & 0x0F]; + } + out.write(outBuf, 0, len*2); + } + out.flush(); + } + } + + private class DecodeHex implements StreamCallback { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + int len; + byte[] inBuf = new byte[8192]; + Hex h = new Hex(); + while ((len = in.read(inBuf)) > 0) { + // If the input buffer is of odd length, try to get another byte + if (len % 2 != 0) { + int b = in.read(); + if (b != -1) { + inBuf[len] = (byte) b; + len++; + } + } + + // Construct a new buffer bounded to len + byte[] slice = Arrays.copyOfRange(inBuf, 0, len); + try { + out.write(h.decode(slice)); + } catch (DecoderException ex) { + throw new IOException(ex); + } + } + out.flush(); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java new file mode 100644 index 0000000000..f7ebeabbe5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/ValidatingBase32InputStream.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.util; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import org.apache.commons.codec.binary.Base32; + + +/** + * An InputStream that throws an IOException if any byte is read that is not a + * valid Base32 character. Whitespace is considered valid. + */ +public class ValidatingBase32InputStream extends FilterInputStream { + + private final Base32 b32 = new Base32(); + + public ValidatingBase32InputStream(InputStream in) { + super(in); + } + + @Override + public int read(byte[] b, int offset, int len) throws IOException { + int numRead = super.read(b, offset, len); + if (numRead > 0) { + byte[] copy = b; + if (numRead < b.length) { + // isBase32 checks the whole length of byte[], we need to limit it to numRead + copy = Arrays.copyOf(b, numRead); + } + if (!b32.isInAlphabet(copy, true)) { + throw new IOException("Data is not base32 encoded."); + } + } + return numRead; + } + + @Override + public int read(byte[] b) throws IOException { + int numRead = super.read(b); + if (numRead > 0) { + byte[] copy = b; + if (numRead < b.length) { + // isBase32 checks the whole length of byte[], we need to limit it to numRead + copy = Arrays.copyOf(b, numRead); + } + if (!b32.isInAlphabet(copy, true)) { + throw new IOException("Data is not base32 encoded."); + } + } + return numRead; + } + + @Override + public int read() throws IOException { + int data = super.read(); + if (!b32.isInAlphabet((byte) data)) { + throw new IOException("Data is not base32 encoded."); + } + return super.read(); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java new file mode 100644 index 0000000000..fec411d56f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestEncodeContent.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.junit.Test; + +public class TestEncodeContent { + + @Test + public void testBase64RoundTrip() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + testRunner.assertQueueEmpty(); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.enqueue(flowFile); + testRunner.clearTransferState(); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + } + + @Test + public void testFailDecodeNotBase64() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1); + } + + @Test + public void testFailDecodeNotBase64ButIsAMultipleOfFourBytes() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE64_ENCODING); + + testRunner.enqueue("four@@@@multiple".getBytes()); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1); + } + + @Test + public void testBase32RoundTrip() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + testRunner.assertQueueEmpty(); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.enqueue(flowFile); + testRunner.clearTransferState(); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + } + + @Test + public void testFailDecodeNotBase32() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.BASE32_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1); + } + + @Test + public void testHexRoundTrip() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.ENCODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + testRunner.assertQueueEmpty(); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.enqueue(flowFile); + testRunner.clearTransferState(); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_SUCCESS, 1); + + flowFile = testRunner.getFlowFilesForRelationship(EncodeContent.REL_SUCCESS).get(0); + flowFile.assertContentEquals(new File("src/test/resources/hello.txt")); + } + + @Test + public void testFailDecodeNotHex() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new EncodeContent()); + + testRunner.setProperty(EncodeContent.MODE, EncodeContent.DECODE_MODE); + testRunner.setProperty(EncodeContent.ENCODING, EncodeContent.HEX_ENCODING); + + testRunner.enqueue(Paths.get("src/test/resources/hello.txt")); + testRunner.clearTransferState(); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(EncodeContent.REL_FAILURE, 1); + } +}