diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE index d923dde51b..dcf296d039 100644 --- a/nifi-assembly/LICENSE +++ b/nifi-assembly/LICENSE @@ -1342,3 +1342,20 @@ For details see https://github.com/svenkubiak/jBCrypt/blob/0.4.1/LICENSE WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +This product bundles 'Jolt' which is available under an Apache License. +For details see https://github.com/bazaarvoice/jolt/blob/jolt-0.0.20/LICENSE + + Copyright 2013-2014 Bazaarvoice, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 293088a23d..65b7b293ba 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -602,6 +602,10 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: Copyright 2011 JsonPath authors + (ASLv2) Jolt + The following NOTICE information applies: + Copyright 2013-2014 Bazaarvoice, Inc + (ASLv2) Kite SDK The following NOTICE information applies: This product includes software developed by Cloudera, Inc. diff --git a/nifi-docs/src/main/asciidoc/getting-started.adoc b/nifi-docs/src/main/asciidoc/getting-started.adoc index a68f171fd7..486606f244 100644 --- a/nifi-docs/src/main/asciidoc/getting-started.adoc +++ b/nifi-docs/src/main/asciidoc/getting-started.adoc @@ -285,6 +285,7 @@ categorizing them by their functions. - *EncryptContent*: Encrypt or Decrypt Content - *ReplaceText*: Use Regular Expressions to modify textual Content - *TransformXml*: Apply an XSLT transform to XML Content +- *TransformJSON*: Apply a JOLT specification to transform JSON Content === Routing and Mediation - *ControlRate*: Throttle the rate at which data can flow through one part of the flow diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE index a38d06b0b1..18c9df15bc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/LICENSE @@ -335,3 +335,19 @@ The binary distribution of this product bundles 'jBCrypt' which is available und WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +This product bundles 'Jolt' which is available under an Apache License. For details see https://github.com/bazaarvoice/jolt/blob/jolt-0.0.20/LICENSE + + Copyright 2013-2014 Bazaarvoice, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE index 364ce56fcb..e723250028 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE @@ -92,6 +92,10 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: Copyright 2011 JsonPath authors + (ASLv2) Jolt + The following NOTICE information applies: + Copyright 2013-2014 Bazaarvoice, Inc + (ASLv2) Apache Commons Codec The following NOTICE information applies: Apache Commons Codec diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index cd87101e0c..17e630a6f4 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -225,6 +225,16 @@ language governing permissions and limitations under the License. --> 1.4.187 test + + com.bazaarvoice.jolt + jolt-core + 0.0.20 + + + com.bazaarvoice.jolt + json-utils + 0.0.20 + @@ -299,6 +309,20 @@ language governing permissions and limitations under the License. --> src/test/resources/TestSplitText/4.txt src/test/resources/TestSplitText/5.txt src/test/resources/TestSplitText/6.txt + src/test/resources/TestTransformJson/input.json + src/test/resources/TestTransformJson/chainrSpec.json + src/test/resources/TestTransformJson/chainrOutput.json + src/test/resources/TestTransformJson/cardrSpec.json + src/test/resources/TestTransformJson/cardrOutput.json + src/test/resources/TestTransformJson/defaultrSpec.json + src/test/resources/TestTransformJson/defaultrOutput.json + src/test/resources/TestTransformJson/shiftrSpec.json + src/test/resources/TestTransformJson/sortrOutput.json + src/test/resources/TestTransformJson/shiftrOutput.json + src/test/resources/TestTransformJson/removrSpec.json + src/test/resources/TestTransformJson/removrOutput.json + src/test/resources/TestTransformJson/defaultrSpec.json + src/test/resources/TestTransformJson/defaultrOutput.json src/test/resources/TestSplitText/original.txt src/test/resources/TestTransformXml/math.html src/test/resources/TestTransformXml/tokens.csv diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java new file mode 100644 index 0000000000..c8576aafcb --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import com.bazaarvoice.jolt.CardinalityTransform; +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Sortr; +import com.bazaarvoice.jolt.Transform; +import com.bazaarvoice.jolt.JsonUtils; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"json", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr","cardinality","sort"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "mime.type",description = "Always set to application/json") +@CapabilityDescription("Applies a list of JOLT specifications to the flowfile JSON payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the JSON transform " + + "fails, the original FlowFile is routed to the 'failure' relationship.") +public class TransformJSON extends AbstractProcessor { + + public static final AllowableValue SHIFTR = new AllowableValue("jolt-transform-shift", "Shift Transform DSL", "Shift input JSON/data to create the output JSON."); + public static final AllowableValue CHAINR = new AllowableValue("jolt-transform-chain", "Chain Transform DSL", "Execute list of JOLT transformations."); + public static final AllowableValue DEFAULTR = new AllowableValue("jolt-transform-default", "Default Transform DSL", " Apply default values to the output JSON."); + public static final AllowableValue REMOVR = new AllowableValue("jolt-transform-remove", "Remove Transform DSL", " Remove values from input data to create the output JSON."); + public static final AllowableValue CARDINALITY = new AllowableValue("jolt-transform-card", "Cardinality Transform DSL", "Change the cardinality of input elements to create the output JSON."); + public static final AllowableValue SORTR = new AllowableValue("jolt-transform-sort", "Sort Transform DSL", "Sort input json key values alphabetically. Any specification set is ignored."); + + public static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("jolt-transform") + .displayName("Jolt Transformation") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(CARDINALITY, CHAINR, DEFAULTR, REMOVR, SHIFTR, SORTR) + .defaultValue(CHAINR.getValue()) + .build(); + + public static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("jolt-spec") + .displayName("Jolt Specification") + .description("Jolt Specification for transform of JSON data. This value is ignored if the Jolt Sort Transformation is selected.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid JSON), it will be routed to this relationship") + .build(); + + private final static List properties; + private final static Set relationships; + private volatile Transform transform; + private final static String DEFAULT_CHARSET = "UTF-8"; + + static{ + + final List _properties = new ArrayList<>(); + _properties.add(JOLT_TRANSFORM); + _properties.add(JOLT_SPEC); + properties = Collections.unmodifiableList(_properties); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); + + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); + String specValue = validationContext.getProperty(JOLT_SPEC).isSet() ? validationContext.getProperty(JOLT_SPEC).getValue() : null; + + if(StringUtils.isEmpty(specValue)){ + if(!SORTR.getValue().equals(transform)) { + String message = "A specification is required for this transformation"; + results.add(new ValidationResult.Builder().valid(false) + .explanation(message) + .build()); + } + } else { + try { + Object specJson = JsonUtils.jsonToObject(specValue, DEFAULT_CHARSET); + TransformationFactory.getTransform(transform, specJson); + } catch (final Exception e) { + getLogger().info("Processor is not valid - " + e.toString()); + String message = "Specification not valid for the selected transformation." ; + results.add(new ValidationResult.Builder().valid(false) + .explanation(message) + .build()); + } + } + return results; + } + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ProcessorLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + final byte[] originalContent = new byte[(int) original.getSize()]; + session.read(original, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, originalContent, true); + } + }); + + final String jsonString; + + try { + final ByteArrayInputStream bais = new ByteArrayInputStream(originalContent); + final Object inputJson = JsonUtils.jsonToObject(bais); + final Object transformedJson = transform.transform(inputJson); + jsonString = JsonUtils.toJsonString(transformedJson); + + } catch (RuntimeException re) { + logger.error("Unable to transform {} due to {}", new Object[]{original, re}); + session.transfer(original, REL_FAILURE); + return; + } + + FlowFile transformed = session.write(original, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(jsonString.getBytes(DEFAULT_CHARSET)); + } + }); + + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(),"application/json"); + session.transfer(transformed, REL_SUCCESS); + session.getProvenanceReporter().modifyContent(transformed,"Modified With " + transformType ,stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + logger.info("Transformed {}", new Object[]{original}); + + } + + @OnScheduled + public void setup(final ProcessContext context) { + Object specJson = null; + if(context.getProperty(JOLT_SPEC).isSet()){ + specJson = JsonUtils.jsonToObject(context.getProperty(JOLT_SPEC).getValue(), DEFAULT_CHARSET); + } + transform = TransformationFactory.getTransform(context.getProperty(JOLT_TRANSFORM).getValue(), specJson); + } + + private static class TransformationFactory { + + static Transform getTransform(String transform, Object specJson) { + if (transform.equals(DEFAULTR.getValue())) { + return new Defaultr(specJson); + } else if (transform.equals(SHIFTR.getValue())) { + return new Shiftr(specJson); + } else if (transform.equals(REMOVR.getValue())) { + return new Removr(specJson); + } else if (transform.equals(CARDINALITY.getValue())) { + return new CardinalityTransform(specJson); + } else if(transform.equals(SORTR.getValue())){ + return new Sortr(); + } else { + return Chainr.fromSpec(specJson); + } + } + + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 21d73a4e84..a50fd8632c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -79,6 +79,7 @@ org.apache.nifi.processors.standard.SplitJson org.apache.nifi.processors.standard.SplitText org.apache.nifi.processors.standard.SplitXml org.apache.nifi.processors.standard.TailFile +org.apache.nifi.processors.standard.TransformJSON org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TransformJSON/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TransformJSON/additionalDetails.html new file mode 100644 index 0000000000..2141d1422c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TransformJSON/additionalDetails.html @@ -0,0 +1,36 @@ + + + + + + TransformJSON + + + + + +

Usage Information

+ +

+ The JOLT utilities processing JSON are not not stream based therefore large JSON document + transformation may consume large amounts of memory. Currently UTF-8 FlowFile content and Jolt specifications are supported. + + Note: When configuring a processor if user selects of the Default transformation yet provides a + Chain specification the system does not alert that the specification is invalid and and will produce failed flow files. + This is a known issue identified within the JOLT library. +

+ + diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformJSON.java new file mode 100644 index 0000000000..e19d32010f --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTransformJSON.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import com.bazaarvoice.jolt.CardinalityTransform; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.Diffy; +import com.bazaarvoice.jolt.JsonUtils; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Sortr; +import com.bazaarvoice.jolt.Transform; + +import static org.junit.Assert.assertTrue; + +public class TestTransformJSON { + + final static Path JSON_INPUT = Paths.get("src/test/resources/TestTransformJson/input.json"); + final static Diffy DIFFY = new Diffy(); + + @Test + public void testRelationshipsCreated() throws IOException{ + Processor processor= new TransformJSON(); + final TestRunner runner = TestRunners.newTestRunner(processor); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.enqueue(JSON_INPUT); + Set relationships = processor.getRelationships(); + assertTrue(relationships.contains(TransformJSON.REL_FAILURE)); + assertTrue(relationships.contains(TransformJSON.REL_SUCCESS)); + assertTrue(relationships.size() == 2); + } + + @Test + public void testInvalidJOLTSpec() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = "[{}]"; + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.assertNotValid(); + } + + @Test + public void testIncorrectJOLTSpec() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, chainrSpec); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecIsNotSet() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecIsEmpty() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + runner.setProperty(TransformJSON.JOLT_SPEC, StringUtils.EMPTY); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecNotRequired() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SORTR); + runner.assertValid(); + } + + @Test + public void testNoFlowFileContent() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(TransformJSON.REL_FAILURE,0); + runner.assertTransferCount(TransformJSON.REL_SUCCESS,0); + } + + @Test + public void testInvalidFlowFileContentJson() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.enqueue("invalid json"); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_FAILURE); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetChainTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.CHAINR.getValue(),JsonUtils.jsonToObject(chainrSpec)); + assertTrue(transform instanceof Chainr); + + } + + @Test + @SuppressWarnings("unchecked") + public void testGetRemoveTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + + final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/removrSpec.json"))); + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.REMOVR.getValue(),JsonUtils.jsonToObject(removrSpec)); + assertTrue(transform instanceof Removr); + + } + + @Test + @SuppressWarnings("unchecked") + public void testGetShiftTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + + final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/shiftrSpec.json"))); + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.SHIFTR.getValue(),JsonUtils.jsonToObject(shiftrSpec)); + assertTrue(transform instanceof Shiftr); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetDefaultTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + + final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/defaultrSpec.json"))); + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.DEFAULTR.getValue(),JsonUtils.jsonToObject(defaultrSpec)); + assertTrue(transform instanceof Defaultr); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetCardinalityTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + + final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/cardrSpec.json"))); + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.CARDINALITY.getValue(),JsonUtils.jsonToObject(cardrSpec)); + assertTrue(transform instanceof CardinalityTransform); + } + + @Test + @SuppressWarnings("unchecked") + public void testGetSortrTransform() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + Transform transform = (Transform) method.invoke(null,TransformJSON.SORTR.getValue(),null); + assertTrue(transform instanceof Sortr); + } + + + @Test + @SuppressWarnings("unchecked") + public void testGetInvalidTransformWithNoSpec() throws NoSuchMethodException, IOException,InvocationTargetException, IllegalAccessException{ + List classes = Arrays.asList(TransformJSON.class.getDeclaredClasses()); + Class factory = classes.stream().filter(clazz -> clazz.getSimpleName().equals("TransformationFactory")).findFirst().get(); + Method method = factory.getDeclaredMethod("getTransform", String.class, Object.class); + method.setAccessible(true); + try { + method.invoke(null, TransformJSON.CHAINR.getValue(), null); + }catch (InvocationTargetException ite){ + assertTrue(ite.getTargetException().getLocalizedMessage().equals("JOLT Chainr expects a JSON array of objects - Malformed spec.")); + } + } + + @Test + public void testTransformInputWithChainr() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/chainrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json"); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/chainrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + + @Test + public void testTransformInputWithShiftr() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/shiftrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SHIFTR); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json"); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/shiftrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + + @Test + public void testTransformInputWithDefaultr() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/defaultrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.DEFAULTR); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/defaultrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + + @Test + public void testTransformInputWithRemovr() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/removrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.REMOVR); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/removrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + + @Test + public void testTransformInputWithCardinality() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformJson/cardrSpec.json"))); + runner.setProperty(TransformJSON.JOLT_SPEC, spec); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.CARDINALITY); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/cardrOutput.json"))); + assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); + } + + @Test + public void testTransformInputWithSortr() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new TransformJSON()); + runner.setProperty(TransformJSON.JOLT_TRANSFORM, TransformJSON.SORTR); + runner.enqueue(JSON_INPUT); + runner.run(); + runner.assertAllFlowFilesTransferred(TransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(TransformJSON.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(),"application/json"); + Object transformedJson = JsonUtils.jsonToObject(new ByteArrayInputStream(transformed.toByteArray())); + Object compareJson = JsonUtils.jsonToObject(Files.newInputStream(Paths.get("src/test/resources/TestTransformJson/sortrOutput.json"))); + String transformedJsonString = JsonUtils.toJsonString(transformedJson); + String compareJsonString = JsonUtils.toJsonString(compareJson); + assertTrue(compareJsonString.equals(transformedJsonString)); + } + + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrOutput.json new file mode 100644 index 0000000000..3dc53ab265 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrOutput.json @@ -0,0 +1,13 @@ +{ + "rating": { + "primary": { + "value": 3 + }, + "quality": { + "value": 3 + }, + "series": { + "value": 5 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrSpec.json new file mode 100644 index 0000000000..fc77f182a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/cardrSpec.json @@ -0,0 +1,7 @@ +{ + "rating" : { + "series" : { + "value" : "ONE" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrOutput.json new file mode 100644 index 0000000000..2e8af88e06 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrOutput.json @@ -0,0 +1,16 @@ +{ + "Range" : 5, + "Rating" : 3, + "SecondaryRatings" : { + "quality" : { + "Id" : "quality", + "Range" : 5, + "Value" : 3 + }, + "series" : { + "Id" : "series", + "Range" : 5, + "Value" : [5,4] + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrSpec.json new file mode 100644 index 0000000000..7884abfe38 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/chainrSpec.json @@ -0,0 +1,29 @@ +[ + { + "operation": "shift", + "spec": { + "rating": { + "primary": { + "value": "Rating", + "max": "RatingRange" + }, + "*": { + "max": "SecondaryRatings.&1.Range", + "value": "SecondaryRatings.&1.Value", + "$": "SecondaryRatings.&1.Id" + } + } + } + }, + { + "operation": "default", + "spec": { + "Range": 5, + "SecondaryRatings": { + "*": { + "Range": 5 + } + } + } + } +] diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrOutput.json new file mode 100644 index 0000000000..87581f4497 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrOutput.json @@ -0,0 +1,24 @@ +{ + "RatingRange" : 5, + "rating": { + "primary": { + "value": 3, + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + }, + "quality": { + "value": 3, + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + }, + "series": { + "value": [5,4], + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + } + + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrSpec.json new file mode 100644 index 0000000000..2f2ea9f662 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/defaultrSpec.json @@ -0,0 +1,10 @@ + { + "RatingRange" : 5, + "rating": { + "*": { + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + } + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/input.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/input.json new file mode 100644 index 0000000000..12d85dbe98 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/input.json @@ -0,0 +1,13 @@ +{ + "rating": { + "primary": { + "value": 3 + }, + "series": { + "value": [5,4] + }, + "quality": { + "value": 3 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrOutput.json new file mode 100644 index 0000000000..da0de3a2d6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrOutput.json @@ -0,0 +1,10 @@ +{ + "rating": { + "primary": { + "value": 3 + }, + "series":{ + "value":[5,4] + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrSpec.json new file mode 100644 index 0000000000..a6bde70b8b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/removrSpec.json @@ -0,0 +1,5 @@ +{ + "rating": { + "quality": "" + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrOutput.json new file mode 100644 index 0000000000..8fd6c3b8c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrOutput.json @@ -0,0 +1,8 @@ +{ + "SecondaryRatings" : { + "quality" : { + "Value" : 3, + "RatingRange" : 3 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrSpec.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrSpec.json new file mode 100644 index 0000000000..d292cdd7e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/shiftrSpec.json @@ -0,0 +1,10 @@ +{ + "rating": { + "primary": { + "value": "SecondaryRatings.quality.RatingRange" + }, + "quality": { + "value": "SecondaryRatings.quality.Value" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/sortrOutput.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/sortrOutput.json new file mode 100644 index 0000000000..ee559da9f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestTransformJson/sortrOutput.json @@ -0,0 +1,13 @@ +{ + "rating": { + "primary": { + "value": 3 + }, + "quality": { + "value": 3 + }, + "series": { + "value": [5,4] + } + } +} \ No newline at end of file