diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 128de680e6..1b21421031 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -709,6 +709,12 @@ language governing permissions and limitations under the License. --> 1.8.0-SNAPSHOT nar + + org.apache.nifi + nifi-jolt-record-nar + 1.8.0-SNAPSHOT + nar + diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/pom.xml new file mode 100644 index 0000000000..6004020709 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-jolt-record-bundle + 1.8.0-SNAPSHOT + + + nifi-jolt-record-nar + 1.8.0-SNAPSHOT + nar + + true + true + + + + + org.apache.nifi + nifi-standard-services-api-nar + 1.8.0-SNAPSHOT + nar + + + org.apache.nifi + nifi-jolt-record-processors + 1.8.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..5673c95a5d --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,60 @@ +nifi-jolt-record-nar +Copyright 2014-2018 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This includes derived works from the Apache Software License V2 library Jolt (https://github.com/bazaarvoice/jolt) + Copyright 2013-2014 Bazaarvoice, Inc + The derived work is adapted from com.bazaarvoice.jolt.chainr.ChainrBuilder.java, com.bazaarvoice.jolt.chainr.spec.ChainrSpec.java, + com.bazaarvoice.jolt.chainr.spec.ChainrEntry.java and can be found in the org.apache.nifi.processors.jolt.record.util.TransformFactory.java class. + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Jolt + The following NOTICE information applies: + Copyright 2013-2014 Bazaarvoice, Inc + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml new file mode 100644 index 0000000000..aa71e27d20 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/pom.xml @@ -0,0 +1,138 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-jolt-record-bundle + 1.8.0-SNAPSHOT + + + nifi-jolt-record-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + 1.8.0-SNAPSHOT + + + org.apache.nifi + nifi-record + 1.8.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + + + com.bazaarvoice.jolt + jolt-core + + + com.bazaarvoice.jolt + json-utils + + + org.apache.nifi + nifi-mock + 1.8.0-SNAPSHOT + test + + + org.apache.nifi + nifi-mock-record-utils + 1.8.0-SNAPSHOT + test + + + org.apache.nifi + nifi-record-serialization-services + 1.8.0-SNAPSHOT + test + + + org.apache.nifi + nifi-schema-registry-service-api + 1.8.0-SNAPSHOT + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/TestJoltTransformRecord/input.json + src/test/resources/TestJoltTransformRecord/chainrSpec.json + src/test/resources/TestJoltTransformRecord/customChainrSpec.json + src/test/resources/TestJoltTransformRecord/chainrOutput.json + src/test/resources/TestJoltTransformRecord/cardrSpec.json + src/test/resources/TestJoltTransformRecord/cardrOutput.json + src/test/resources/TestJoltTransformRecord/defaultrSpec.json + src/test/resources/TestJoltTransformRecord/defaultrOutput.json + src/test/resources/TestJoltTransformRecord/shiftrSpec.json + src/test/resources/TestJoltTransformRecord/sortrOutput.json + src/test/resources/TestJoltTransformRecord/shiftrOutput.json + src/test/resources/TestJoltTransformRecord/removrSpec.json + src/test/resources/TestJoltTransformRecord/removrOutput.json + src/test/resources/TestJoltTransformRecord/defaultrSpec.json + src/test/resources/TestJoltTransformRecord/defaultrOutput.json + src/test/resources/TestJoltTransformRecord/defaultrELSpec.json + src/test/resources/TestJoltTransformRecord/defaultrELOutput.json + src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json + src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json + src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json + src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json + src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json + src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json + src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json + src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json + src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json + src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json + src/test/resources/TestTransformFactory/chainrSpec.json + src/test/resources/TestTransformFactory/cardrSpec.json + src/test/resources/TestTransformFactory/defaultrSpec.json + src/test/resources/TestTransformFactory/shiftrSpec.json + src/test/resources/TestTransformFactory/removrSpec.json + src/test/resources/TestTransformFactory/defaultrSpec.json + src/test/resources/TestTransformFactory/modifierDefaultSpec.json + src/test/resources/TestTransformFactory/modifierDefineSpec.json + src/test/resources/TestTransformFactory/modifierOverwriteSpec.json + + + + + + diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java new file mode 100644 index 0000000000..86b721ff47 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/JoltTransformRecord.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jolt.record; + +import com.bazaarvoice.jolt.ContextualTransform; +import com.bazaarvoice.jolt.JoltTransform; +import com.bazaarvoice.jolt.JsonUtils; +import com.bazaarvoice.jolt.Transform; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.jolt.record.util.TransformFactory; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; + +import java.io.FilenameFilter; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttributes({ + @WritesAttribute(attribute = "record.count", description = "The number of records in an outgoing FlowFile"), + @WritesAttribute(attribute = "mime.type", description = "The MIME Type that the configured Record Writer indicates is appropriate"), +}) +@CapabilityDescription("Applies a list of Jolt specifications to the FlowFile payload. A new FlowFile is created " + + "with transformed content and is routed to the 'success' relationship. If the transform " + + "fails, the original FlowFile is routed to the 'failure' relationship.") +public class JoltTransformRecord extends AbstractProcessor { + + static final AllowableValue SHIFTR + = new AllowableValue("jolt-transform-shift", "Shift", "Shift input data to create the output."); + static final AllowableValue CHAINR + = new AllowableValue("jolt-transform-chain", "Chain", "Execute list of Jolt transformations."); + static final AllowableValue DEFAULTR + = new AllowableValue("jolt-transform-default", "Default", " Apply default values to the output."); + static final AllowableValue REMOVR + = new AllowableValue("jolt-transform-remove", "Remove", " Remove values from input data to create the output."); + static final AllowableValue CARDINALITY + = new AllowableValue("jolt-transform-card", "Cardinality", "Change the cardinality of input elements to create the output."); + static final AllowableValue SORTR + = new AllowableValue("jolt-transform-sort", "Sort", "Sort input field name values alphabetically. Any specification set is ignored."); + static final AllowableValue CUSTOMR + = new AllowableValue("jolt-transform-custom", "Custom", "Custom Transformation. Requires Custom Transformation Class Name"); + static final AllowableValue MODIFIER_DEFAULTR + = new AllowableValue("jolt-transform-modify-default", "Modify - Default", "Writes when field name is missing or value is null"); + static final AllowableValue MODIFIER_OVERWRITER + = new AllowableValue("jolt-transform-modify-overwrite", "Modify - Overwrite", " Always overwrite value"); + static final AllowableValue MODIFIER_DEFINER + = new AllowableValue("jolt-transform-modify-define", "Modify - Define", "Writes when key is missing"); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("jolt-record-record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("jolt-record-record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final PropertyDescriptor JOLT_TRANSFORM = new PropertyDescriptor.Builder() + .name("jolt-record-transform") + .displayName("Jolt Transformation DSL") + .description("Specifies the Jolt Transformation that should be used with the provided specification.") + .required(true) + .allowableValues(CARDINALITY, CHAINR, DEFAULTR, MODIFIER_DEFAULTR, MODIFIER_DEFINER, MODIFIER_OVERWRITER, REMOVR, SHIFTR, SORTR, CUSTOMR) + .defaultValue(CHAINR.getValue()) + .build(); + + static final PropertyDescriptor JOLT_SPEC = new PropertyDescriptor.Builder() + .name("jolt-record-spec") + .displayName("Jolt Specification") + .description("Jolt Specification for transform of record data. This value is ignored if the Jolt Sort Transformation is selected.") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + + static final PropertyDescriptor CUSTOM_CLASS = new PropertyDescriptor.Builder() + .name("jolt-record-custom-class") + .displayName("Custom Transformation Class Name") + .description("Fully Qualified Class Name for Custom Transformation") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor MODULES = new PropertyDescriptor.Builder() + .name("jolt-record-custom-modules") + .displayName("Custom Module Directory") + .description("Comma-separated list of paths to files and/or directories which contain modules containing custom transformations (that are not included on NiFi's classpath).") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamicallyModifiesClasspath(true) + .build(); + + static final PropertyDescriptor TRANSFORM_CACHE_SIZE = new PropertyDescriptor.Builder() + .name("jolt-record-transform-cache-size") + .displayName("Transform Cache Size") + .description("Compiling a Jolt Transform can be fairly expensive. Ideally, this will be done only once. However, if the Expression Language is used in the transform, we may need " + + "a new Transform for each FlowFile. This value controls how many of those Transforms we cache in memory in order to avoid having to compile the Transform each time.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("The FlowFile with transformed content will be routed to this relationship") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship") + .build(); + + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship") + .build(); + + private final static List properties; + private final static Set relationships; + private final static String DEFAULT_CHARSET = "UTF-8"; + + // Cache is guarded by synchronizing on 'this'. + private volatile int maxTransformsToCache = 10; + private final Map transformCache = new LinkedHashMap() { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + final boolean evict = size() > maxTransformsToCache; + if (evict) { + getLogger().debug("Removing Jolt Transform from cache because cache is full"); + } + return evict; + } + }; + + static { + final List _properties = new ArrayList<>(); + _properties.add(RECORD_READER); + _properties.add(RECORD_WRITER); + _properties.add(JOLT_TRANSFORM); + _properties.add(CUSTOM_CLASS); + _properties.add(MODULES); + _properties.add(JOLT_SPEC); + _properties.add(TRANSFORM_CACHE_SIZE); + properties = Collections.unmodifiableList(_properties); + + final Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + _relationships.add(REL_ORIGINAL); + relationships = Collections.unmodifiableSet(_relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + final String transform = validationContext.getProperty(JOLT_TRANSFORM).getValue(); + final String customTransform = validationContext.getProperty(CUSTOM_CLASS).getValue(); + + if (!validationContext.getProperty(JOLT_SPEC).isSet() || StringUtils.isEmpty(validationContext.getProperty(JOLT_SPEC).getValue())) { + if (!SORTR.getValue().equals(transform)) { + final String message = "A specification is required for this transformation"; + results.add(new ValidationResult.Builder().valid(false) + .explanation(message) + .build()); + } + } else { + try { + final String specValue = validationContext.getProperty(JOLT_SPEC).getValue(); + + if (validationContext.isExpressionLanguagePresent(specValue)) { + final String invalidExpressionMsg = validationContext.newExpressionLanguageCompiler().validateExpression(specValue, true); + if (!StringUtils.isEmpty(invalidExpressionMsg)) { + results.add(new ValidationResult.Builder().valid(false) + .subject(JOLT_SPEC.getDisplayName()) + .explanation("Invalid Expression Language: " + invalidExpressionMsg) + .build()); + } + } else { + //for validation we want to be able to ensure the spec is syntactically correct and not try to resolve variables since they may not exist yet + Object specJson = SORTR.getValue().equals(transform) ? null : JsonUtils.jsonToObject(specValue.replaceAll("\\$\\{", "\\\\\\\\\\$\\{"), DEFAULT_CHARSET); + + if (CUSTOMR.getValue().equals(transform)) { + if (StringUtils.isEmpty(customTransform)) { + final String customMessage = "A custom transformation class should be provided. "; + results.add(new ValidationResult.Builder().valid(false) + .explanation(customMessage) + .build()); + } else { + TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), customTransform, specJson); + } + } else { + TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), transform, specJson); + } + } + } catch (final Exception e) { + getLogger().info("Processor is not valid - " + e.toString()); + String message = "Specification not valid for the selected transformation."; + results.add(new ValidationResult.Builder().valid(false) + .explanation(message) + .build()); + } + } + + return results; + } + + @SuppressWarnings("unchecked") + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ComponentLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + final RecordSchema schema; + final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); + try (final InputStream in = session.read(original); + final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema()); + Record record; + + FlowFile transformed = session.create(original); + final Map attributes = new HashMap<>(); + final WriteResult writeResult; + try (final OutputStream out = session.write(transformed); + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { + + final JoltTransform transform = getTransform(context, original); + writer.beginRecordSet(); + while ((record = reader.nextRecord()) != null) { + Map recordMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); + // JOLT expects arrays to be of type List where our Record code uses Object[]. + // Make another pass of the transformed objects to change Object[] to List. + recordMap = (Map) normalizeJoltObjects(recordMap); + Object transformedObject = transform(transform, recordMap); + // JOLT expects arrays to be of type List where our Record code uses Object[]. + // Make another pass of the transformed objects to change List to Object[]. + Record r = DataTypeUtils.toRecord(normalizeRecordObjects(transformedObject), schema, "r"); + writer.write(r); + } + writeResult = writer.finishRecordSet(); + + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + } catch (Exception e) { + logger.error("Unable to write transformed records {} due to {}", new Object[]{original, e.toString(), e}); + session.remove(transformed); + session.transfer(original, REL_FAILURE); + return; + } + + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + transformed = session.putAllAttributes(transformed, attributes); + session.transfer(transformed, REL_SUCCESS); + session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(original, REL_ORIGINAL); + logger.debug("Transformed {}", new Object[]{original}); + + + } catch (final Exception ex) { + logger.error("Unable to transform {} due to {}", new Object[]{original, ex.toString(), ex}); + session.transfer(original, REL_FAILURE); + return; + } + } + + private JoltTransform getTransform(final ProcessContext context, final FlowFile flowFile) throws Exception { + final String specString; + if (context.getProperty(JOLT_SPEC).isSet()) { + specString = context.getProperty(JOLT_SPEC).evaluateAttributeExpressions(flowFile).getValue(); + } else { + specString = null; + } + + // Get the transform from our cache, if it exists. + JoltTransform transform; + synchronized (this) { + transform = transformCache.get(specString); + } + + if (transform != null) { + return transform; + } + + // If no transform for our spec, create the transform. + final Object specJson; + if (context.getProperty(JOLT_SPEC).isSet() && !SORTR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + specJson = JsonUtils.jsonToObject(specString, DEFAULT_CHARSET); + } else { + specJson = null; + } + + if (CUSTOMR.getValue().equals(context.getProperty(JOLT_TRANSFORM).getValue())) { + transform = TransformFactory.getCustomTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(CUSTOM_CLASS).getValue(), specJson); + } else { + transform = TransformFactory.getTransform(Thread.currentThread().getContextClassLoader(), context.getProperty(JOLT_TRANSFORM).getValue(), specJson); + } + + // Check again for the transform in our cache, since it's possible that another thread has + // already populated it. If absent from the cache, populate the cache. Otherwise, use the + // value from the cache. + synchronized (this) { + final JoltTransform existingTransform = transformCache.get(specString); + if (existingTransform == null) { + transformCache.put(specString, transform); + } else { + transform = existingTransform; + } + } + + return transform; + } + + @OnScheduled + public synchronized void setup(final ProcessContext context) { + transformCache.clear(); + maxTransformsToCache = context.getProperty(TRANSFORM_CACHE_SIZE).asInteger(); + } + + protected FilenameFilter getJarFilenameFilter() { + return (dir, name) -> (name != null && name.endsWith(".jar")); + } + + protected static Object transform(JoltTransform joltTransform, Object input) { + return joltTransform instanceof ContextualTransform + ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); + } + + /** + * Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[]. + * + * @param o The object to normalize with respect to JOLT + */ + @SuppressWarnings("unchecked") + protected static Object normalizeJoltObjects(final Object o) { + if (o instanceof Map) { + Map m = ((Map) o); + m.forEach((k, v) -> m.put(k, normalizeJoltObjects(v))); + return m; + } else if (o instanceof Object[]) { + return Arrays.stream(((Object[]) o)).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); + } else if (o instanceof Collection) { + Collection c = (Collection) o; + return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); + } else { + return o; + } + } + + @SuppressWarnings("unchecked") + protected static Object normalizeRecordObjects(final Object o) { + if (o instanceof Map) { + Map m = ((Map) o); + m.forEach((k, v) -> m.put(k, normalizeRecordObjects(v))); + return m; + } else if (o instanceof List) { + return ((List) o).stream().map(JoltTransformRecord::normalizeRecordObjects).toArray(Object[]::new); + } else if (o instanceof Collection) { + Collection c = (Collection) o; + return c.stream().map(JoltTransformRecord::normalizeRecordObjects).collect(Collectors.toList()); + } else { + return o; + } + } + +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/util/TransformFactory.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/util/TransformFactory.java new file mode 100644 index 0000000000..d963936074 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/java/org/apache/nifi/processors/jolt/record/util/TransformFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt.record.util; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.bazaarvoice.jolt.CardinalityTransform; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.JoltTransform; +import com.bazaarvoice.jolt.Modifier; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Sortr; +import com.bazaarvoice.jolt.SpecDriven; +import com.bazaarvoice.jolt.chainr.spec.ChainrEntry; +import com.bazaarvoice.jolt.exception.SpecException; + +public class TransformFactory { + + public static JoltTransform getTransform(final ClassLoader classLoader,final String transformType, final Object specJson) throws Exception { + + if (transformType.equals("jolt-transform-default")) { + return new Defaultr(specJson); + } else if (transformType.equals("jolt-transform-shift")) { + return new Shiftr(specJson); + } else if (transformType.equals("jolt-transform-remove")) { + return new Removr(specJson); + } else if (transformType.equals("jolt-transform-card")) { + return new CardinalityTransform(specJson); + } else if(transformType.equals("jolt-transform-sort")){ + return new Sortr(); + } else if(transformType.equals("jolt-transform-modify-default")){ + return new Modifier.Defaultr(specJson); + } else if(transformType.equals("jolt-transform-modify-overwrite")){ + return new Modifier.Overwritr(specJson); + } else if(transformType.equals("jolt-transform-modify-define")){ + return new Modifier.Definr(specJson); + } else{ + return new Chainr(getChainrJoltTransformations(classLoader,specJson)); + } + + } + + @SuppressWarnings("unchecked") + public static JoltTransform getCustomTransform(final ClassLoader classLoader, final String customTransformType, final Object specJson) throws Exception { + final Class clazz = classLoader.loadClass(customTransformType); + if(SpecDriven.class.isAssignableFrom(clazz)){ + final Constructor constructor = clazz.getConstructor(Object.class); + return (JoltTransform)constructor.newInstance(specJson); + + }else{ + return (JoltTransform)clazz.newInstance(); + } + } + + + protected static List getChainrJoltTransformations(ClassLoader classLoader, Object specJson) throws Exception{ + if(!(specJson instanceof List)) { + throw new SpecException("JOLT Chainr expects a JSON array of objects - Malformed spec."); + } else { + + List operations = (List)specJson; + + if(operations.isEmpty()) { + throw new SpecException("JOLT Chainr passed an empty JSON array."); + } else { + + ArrayList entries = new ArrayList<>(operations.size()); + + for(Object chainrEntryObj : operations) { + + if(!(chainrEntryObj instanceof Map)) { + throw new SpecException("JOLT ChainrEntry expects a JSON map - Malformed spec"); + } else { + Map chainrEntryMap = (Map)chainrEntryObj; + String opString = (String) chainrEntryMap.get("operation"); + String operationClassName; + + if(opString == null) { + throw new SpecException("JOLT Chainr \'operation\' must implement Transform or ContextualTransform"); + } else { + + operationClassName = ChainrEntry.STOCK_TRANSFORMS.getOrDefault(opString, opString); + + entries.add(getCustomTransform(classLoader,operationClassName,chainrEntryMap.get("spec"))); + } + } + } + + return entries; + } + } + + } + + + + +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..44bdab7e05 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.jolt.record.JoltTransformRecord \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java new file mode 100644 index 0000000000..313781ddad --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/TestJoltTransformRecord.java @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jolt.record; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.StringUtils; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestJoltTransformRecord { + + private TestRunner runner; + private JoltTransformRecord processor; + private MockRecordParser parser; + private JsonRecordSetWriter writer; + + @Before + public void setup() throws Exception { + processor = new JoltTransformRecord(); + runner = TestRunners.newTestRunner(processor); + parser = new MockRecordParser(); + try { + runner.addControllerService("parser", parser); + } catch (InitializationException e) { + throw new IOException(e); + } + runner.enableControllerService(parser); + runner.setProperty(JoltTransformRecord.RECORD_READER, "parser"); + writer = new JsonRecordSetWriter(); + try { + runner.addControllerService("writer", writer); + } catch (InitializationException e) { + throw new IOException(e); + } + runner.setProperty(writer, "Schema Write Strategy", "full-schema-attribute"); + runner.setProperty(JoltTransformRecord.RECORD_WRITER, "writer"); + // Each test must set the Schema Access strategy and Schema, and enable the writer CS + } + + @Test + public void testRelationshipsCreated() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.enqueue(new byte[0]); + Set relationships = processor.getRelationships(); + assertTrue(relationships.contains(JoltTransformRecord.REL_FAILURE)); + assertTrue(relationships.contains(JoltTransformRecord.REL_SUCCESS)); + assertTrue(relationships.contains(JoltTransformRecord.REL_ORIGINAL)); + assertEquals(3, relationships.size()); + } + + @Test + public void testInvalidJOLTSpec() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = "[{}]"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.assertNotValid(); + } + + @Test + public void testIncorrectJOLTSpec() throws IOException { + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, chainrSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecIsNotSet() { + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecIsEmpty() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, StringUtils.EMPTY); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR); + runner.assertNotValid(); + } + + @Test + public void testSpecNotRequired() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR); + runner.assertValid(); + } + + @Test + public void testNoFlowFileContent() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.run(); + runner.assertQueueEmpty(); + runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); + } + + @Test + public void testInvalidFlowFileContent() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + parser.failAfter(0); + runner.enqueue("invalid json"); + runner.run(); + runner.assertAllFlowFilesTransferred(JoltTransformRecord.REL_FAILURE); + } + + @Test + public void testCustomTransformationWithNoModule() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/customChainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR); + runner.assertNotValid(); + } + + @Test + public void testCustomTransformationWithMissingClassName() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.MODULES, customJarPath); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR); + runner.enqueue(new byte[0]); + runner.assertNotValid(); + } + + @Test + public void testCustomTransformationWithInvalidClassPath() throws IOException { + final String customJarPath = "src/test/resources/TestJoltTransformRecord/FakeCustomJar.jar"; + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); + runner.setProperty(JoltTransformRecord.MODULES, customJarPath); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR); + runner.enqueue(new byte[0]); + runner.assertNotValid(); + } + + @Test + public void testCustomTransformationWithInvalidClassName() throws IOException { + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "FakeCustomJoltTransform"); + runner.setProperty(JoltTransformRecord.MODULES, customJarPath); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CUSTOMR); + runner.enqueue(new byte[0]); + runner.assertNotValid(); + } + + @Test + public void testTransformInputWithChainr() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/chainrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithShiftr() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SHIFTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutput.json"))), + new String(transformed.toByteArray())); + + } + + @Test + public void testTransformInputWithDefaultr() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithRemovr() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.REMOVR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/removrOutput.json"))), + new String(transformed.toByteArray())); + + } + + @Test + public void testTransformInputWithCardinality() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.CARDINALITY); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/cardrOutput.json"))), + new String(transformed.toByteArray())); + + } + + @Test + public void testTransformInputWithSortr() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithDefaultrExpressionLanguage() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR); + runner.setVariable("quota", "5"); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrELOutput.json"))), + new String(transformed.toByteArray())); + + } + + @Test + public void testTransformInputWithModifierDefault() throws IOException { + generateTestData(1, null); + // Input schema = output schema, just modifying values + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/inputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithModifierDefine() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithModifierOverwrite() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.MODIFIER_DEFAULTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputWithSortrPopulatedSpec() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.SORTR); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, "abcd"); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/sortrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testTransformInputCustomTransformationIgnored() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String customJarPath = "src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar"; + final String spec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrSpec.json"))); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.CUSTOM_CLASS, "TestCustomJoltTransform"); + runner.setProperty(JoltTransformRecord.MODULES, customJarPath); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR); + runner.enqueue(new byte[0]); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testJoltSpecEL() throws IOException { + generateTestData(1, null); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc"))); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(writer, "Pretty Print JSON", "true"); + runner.enableControllerService(writer); + final String spec = "${joltSpec}"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformRecord.DEFAULTR); + final Map attributes = Collections.singletonMap("joltSpec", + "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); + runner.enqueue(new byte[0], attributes); + runner.run(); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); +runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).get(0); + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestJoltTransformRecord/defaultrOutput.json"))), + new String(transformed.toByteArray())); + } + + @Test + public void testJoltSpecInvalidEL() { + final TestRunner runner = TestRunners.newTestRunner(new JoltTransformRecord()); + final String spec = "${joltSpec:nonExistingFunction()}"; + runner.setProperty(JoltTransformRecord.JOLT_SPEC, spec); + runner.enqueue(new byte[0]); + runner.assertNotValid(); + } + + private void generateTestData(int numRecords, final BiFunction recordGenerator) { + + if (recordGenerator == null) { + final RecordSchema primarySchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema seriesSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); + final RecordSchema qualitySchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema ratingSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("primary", RecordFieldType.RECORD.getDataType()), + new RecordField("series", RecordFieldType.RECORD.getDataType()), + new RecordField("quality", RecordFieldType.RECORD.getDataType()) + )); + parser.addSchemaField("rating", RecordFieldType.RECORD); + + for (int i = 0; i < numRecords; i++) { + final int index = i; + + Record primaryRecord = new MapRecord(primarySchema, new HashMap() {{ + put("value", (10 * index) + 3); + }}); + Record seriesRecord = new MapRecord(seriesSchema, new HashMap() {{ + put("value", new Integer[]{(10 * index) + 5, (10 * index) + 4}); + }}); + Record qualityRecord = new MapRecord(qualitySchema, new HashMap() {{ + put("value", 3); + }}); + + + Record ratingRecord = new MapRecord(ratingSchema, new HashMap() {{ + put("primary", primaryRecord); + put("series", seriesRecord); + put("quality", qualityRecord); + }}); + + parser.addRecord(ratingRecord); + } + + + } else { + recordGenerator.apply(numRecords, parser); + } + } + +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java new file mode 100644 index 0000000000..016bebf6f0 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/java/org/apache/nifi/processors/jolt/record/util/TestTransformFactory.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt.record.util; + +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.junit.Test; + +import com.bazaarvoice.jolt.CardinalityTransform; +import com.bazaarvoice.jolt.Chainr; +import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.JoltTransform; +import com.bazaarvoice.jolt.JsonUtils; +import com.bazaarvoice.jolt.Modifier; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Shiftr; +import com.bazaarvoice.jolt.Sortr; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestTransformFactory { + + + @Test + public void testGetChainTransform() throws Exception { + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain", JsonUtils.jsonToObject(chainrSpec)); + assertTrue(transform instanceof Chainr); + } + + @Test + public void testGetDefaultTransform() throws Exception { + final String defaultrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/defaultrSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-default", JsonUtils.jsonToObject(defaultrSpec)); + assertTrue(transform instanceof Defaultr); + } + + @Test + public void testGetSortTransform() throws Exception { + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-sort", null); + assertTrue(transform instanceof Sortr); + } + + @Test + public void testGetShiftTransform() throws Exception { + final String shiftrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/shiftrSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-shift", JsonUtils.jsonToObject(shiftrSpec)); + assertTrue(transform instanceof Shiftr); + } + + @Test + public void testGetRemoveTransform() throws Exception { + final String removrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/removrSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-remove", JsonUtils.jsonToObject(removrSpec)); + assertTrue(transform instanceof Removr); + } + + @Test + public void testGetCardinalityTransform() throws Exception { + final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/cardrSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-card", JsonUtils.jsonToObject(cardrSpec)); + assertTrue(transform instanceof CardinalityTransform); + } + + @Test + public void testGetModifierDefaultTransform() throws Exception { + final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierDefaultSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-default", JsonUtils.jsonToObject(cardrSpec)); + assertTrue(transform instanceof Modifier.Defaultr); + } + + @Test + public void testGetModifierDefineTransform() throws Exception { + final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierDefineSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-define", JsonUtils.jsonToObject(cardrSpec)); + assertTrue(transform instanceof Modifier.Definr); + } + + @Test + public void testGetModifierOverwriteTransform() throws Exception { + final String cardrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/modifierOverwriteSpec.json"))); + JoltTransform transform = TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-modify-overwrite", JsonUtils.jsonToObject(cardrSpec)); + assertTrue(transform instanceof Modifier.Overwritr); + } + + @Test + public void testGetInvalidTransformWithNoSpec() { + try { + TransformFactory.getTransform(getClass().getClassLoader(), "jolt-transform-chain", null); + } catch (Exception e) { + assertEquals("JOLT Chainr expects a JSON array of objects - Malformed spec.", e.getLocalizedMessage()); + } + } + + @Test + public void testGetCustomTransformation() throws Exception { + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json"))); + Path jarFilePath = Paths.get("src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar"); + URL[] urlPaths = new URL[1]; + urlPaths[0] = jarFilePath.toUri().toURL(); + ClassLoader customClassLoader = new URLClassLoader(urlPaths, this.getClass().getClassLoader()); + JoltTransform transform = TransformFactory.getCustomTransform(customClassLoader, "TestCustomJoltTransform", JsonUtils.jsonToObject(chainrSpec)); + assertNotNull(transform); + assertEquals("TestCustomJoltTransform", transform.getClass().getName()); + } + + @Test + public void testGetCustomTransformationNotFound() throws Exception { + final String chainrSpec = new String(Files.readAllBytes(Paths.get("src/test/resources/TestTransformFactory/chainrSpec.json"))); + try { + TransformFactory.getCustomTransform(this.getClass().getClassLoader(), "TestCustomJoltTransform", chainrSpec); + } catch (ClassNotFoundException cnf) { + assertEquals("TestCustomJoltTransform", cnf.getLocalizedMessage()); + } + } + + +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar new file mode 100644 index 0000000000..b738658f1c Binary files /dev/null and b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/TestCustomJoltTransform.jar differ diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json new file mode 100644 index 0000000000..7910e1ab5d --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutput.json @@ -0,0 +1,13 @@ +[ { + "rating" : { + "primary" : { + "value" : 3 + }, + "quality" : { + "value" : 3 + }, + "series" : { + "value" : 5 + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc new file mode 100644 index 0000000000..94793b0eb0 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrOutputSchema.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "input", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": "int"} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json new file mode 100644 index 0000000000..fc77f182a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/cardrSpec.json @@ -0,0 +1,7 @@ +{ + "rating" : { + "series" : { + "value" : "ONE" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json new file mode 100644 index 0000000000..2065f6356d --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutput.json @@ -0,0 +1,16 @@ +[ { + "Range" : 5, + "Rating" : 3, + "SecondaryRatings" : { + "quality" : { + "Id" : "quality", + "Range" : 5, + "Value" : 3 + }, + "series" : { + "Id" : "series", + "Range" : 5, + "Value" : [ 5, 4 ] + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc new file mode 100644 index 0000000000..eebf0ec644 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrOutputSchema.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "Range", "type": "int"}, + {"name": "Rating", "type": "int"}, + {"name": "SecondaryRatings", "type": { + "name": "SecondaryRatingsRecord", "type": "record", "fields":[ + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "Id", "type": ["null", "string"]}, + {"name": "Range", "type": ["null", "int"]}, + {"name": "Value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "Id", "type": ["null", "string"]}, + {"name": "Range", "type": ["null", "int"]}, + {"name": "Value", "type": { "type": "array", "items": "int" }} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json new file mode 100644 index 0000000000..7884abfe38 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/chainrSpec.json @@ -0,0 +1,29 @@ +[ + { + "operation": "shift", + "spec": { + "rating": { + "primary": { + "value": "Rating", + "max": "RatingRange" + }, + "*": { + "max": "SecondaryRatings.&1.Range", + "value": "SecondaryRatings.&1.Value", + "$": "SecondaryRatings.&1.Id" + } + } + } + }, + { + "operation": "default", + "spec": { + "Range": 5, + "SecondaryRatings": { + "*": { + "Range": 5 + } + } + } + } +] diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json new file mode 100644 index 0000000000..eef0933441 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/customChainrSpec.json @@ -0,0 +1,35 @@ +[ + { + "operation":"TestCustomJoltTransform", + "spec" : + [ + { + "operation": "shift", + "spec": { + "rating": { + "primary": { + "value": "Rating", + "max": "RatingRange" + }, + "*": { + "max": "SecondaryRatings.&1.Range", + "value": "SecondaryRatings.&1.Value", + "$": "SecondaryRatings.&1.Id" + } + } + } + }, + { + "operation": "default", + "spec": { + "Range": 5, + "SecondaryRatings": { + "*": { + "Range": 5 + } + } + } + } + ] + } +] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json new file mode 100644 index 0000000000..92c595dce4 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutput.json @@ -0,0 +1,26 @@ +[ { + "RatingRange" : 5, + "rating" : { + "primary" : { + "value" : 3, + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + }, + "quality" : { + "value" : 3, + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + }, + "series" : { + "value" : [ 5, 4 ], + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + }, + "quota" : { + "value" : "5" + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc new file mode 100644 index 0000000000..6b1ab6264e --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELOutputSchema.avsc @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "RatingRange", "type": "int"}, + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + }, + { "name": "quota", "type": { + "name": "quotaRecord", "type": "record", "fields":[ + {"name": "value", "type": "string"} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json new file mode 100644 index 0000000000..b06f10aaab --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrELSpec.json @@ -0,0 +1,26 @@ +{ + "RatingRange" : 5, + "rating": { + "primary": { + "value": 3, + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + }, + "quality": { + "value": 3, + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + }, + "series": { + "value": [5,4], + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + }, + "quota":{ + "value": "${quota}" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json new file mode 100644 index 0000000000..81b97848d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutput.json @@ -0,0 +1,23 @@ +[ { + "RatingRange" : 5, + "rating" : { + "primary" : { + "value" : 3, + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + }, + "quality" : { + "value" : 3, + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + }, + "series" : { + "value" : [ 5, 4 ], + "MaxLabel" : "High", + "MinLabel" : "Low", + "DisplayType" : "NORMAL" + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc new file mode 100644 index 0000000000..979280c066 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrOutputSchema.avsc @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "RatingRange", "type": "int"}, + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }}, + {"name": "MaxLabel", "type": "string"}, + {"name": "MinLabel", "type": "string"}, + {"name": "DisplayType", "type": "string"} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json new file mode 100644 index 0000000000..2f2ea9f662 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/defaultrSpec.json @@ -0,0 +1,10 @@ + { + "RatingRange" : 5, + "rating": { + "*": { + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + } + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json new file mode 100644 index 0000000000..12d85dbe98 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/input.json @@ -0,0 +1,13 @@ +{ + "rating": { + "primary": { + "value": 3 + }, + "series": { + "value": [5,4] + }, + "quality": { + "value": 3 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc new file mode 100644 index 0000000000..247a491590 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/inputSchema.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "input", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json new file mode 100644 index 0000000000..e41ebb4d8f --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultOutput.json @@ -0,0 +1,13 @@ +[ { + "rating" : { + "primary" : { + "value" : 0 + }, + "series" : { + "value" : [ 5, 4 ] + }, + "quality" : { + "value" : 3 + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json new file mode 100644 index 0000000000..a351a98c74 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefaultSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "primary?": { + "+value": 0 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json new file mode 100644 index 0000000000..f3071203bb --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutput.json @@ -0,0 +1,16 @@ +[ { + "rating" : { + "primary" : { + "value" : 3 + }, + "series" : { + "value" : [ 5, 4 ] + }, + "quality" : { + "value" : 3 + }, + "question" : { + "value" : 0 + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc new file mode 100644 index 0000000000..5586c65a04 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineOutputSchema.avsc @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "input", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "question", "type": { + "name": "questionRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json new file mode 100644 index 0000000000..e4d9ec8a33 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierDefineSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "question": { + "value": 0 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json new file mode 100644 index 0000000000..ef773ff07a --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutput.json @@ -0,0 +1,14 @@ +[ { + "rating" : { + "primary" : { + "value" : 3 + }, + "quality" : { + "value" : 3 + }, + "series" : { + "series_first" : 5, + "value" : [ 5, 4 ] + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc new file mode 100644 index 0000000000..7cd896c696 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteOutputSchema.avsc @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "input", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "series_first", "type": "int"}, + {"name": "value", "type": { "type": "array", "items": "int" }} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json new file mode 100644 index 0000000000..33b5e2a4e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/modifierOverwriteSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "series": { + "series_first": "=firstElement(@(1,value))" + } + } +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json new file mode 100644 index 0000000000..f25a093d51 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutput.json @@ -0,0 +1,12 @@ +[ { + "Rating" : [ 3, 13 ], + "SecondaryRatings" : { + "quality" : { + "Id" : [ "quality", "quality" ] + }, + "series" : { + "Id" : [ "series", "series" ] + }, + "Range" : 5 + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc new file mode 100644 index 0000000000..d88b05c7ee --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrOutputSchema.avsc @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "Rating", "type": {"type": "array", "items": "int"}}, + {"name": "SecondaryRatings", "type": { + "name": "SecondaryRatingsRecord", "type": "record", "fields":[ + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "Id", "type": {"type": "array", "items": "string"}} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "Id", "type": {"type": "array", "items": "string"}} + ] + } + }, + {"name": "Range", "type": "int"} + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json new file mode 100644 index 0000000000..44961179a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleChainrSpec.json @@ -0,0 +1,32 @@ +[ + { + "operation": "shift", + "spec": { + "*": { + "rating": { + "primary": { + "value": "Rating", + "max": "RatingRange" + }, + "*": { + "max": "SecondaryRatings.&1.Range", + "$": "SecondaryRatings.&1.Id" + } + } + } + } + }, + { + "operation": "default", + "spec": { + "*": { + "Range": 5, + "SecondaryRatings": { + "*": { + "Range": 5 + } + } + } + } + } +] diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json new file mode 100644 index 0000000000..391e0b35ad --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutput.json @@ -0,0 +1,5 @@ +[ { + "primary_value" : 3 +}, { + "primary_value" : 13 +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc new file mode 100644 index 0000000000..ee81ff15a3 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrOutputSchema.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "primary_value", "type": "int"} + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json new file mode 100644 index 0000000000..e60b42e152 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/multipleToMultipleChainrSpec.json @@ -0,0 +1,13 @@ +[ + { + "operation": "shift", + "spec": { + "*": { + "rating": { + "primary": { + "value": "[#4].primary_value" + } + } + } + } + }] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutput.json new file mode 100644 index 0000000000..405a984e4b --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutput.json @@ -0,0 +1,10 @@ +[ { + "rating" : { + "primary" : { + "value" : 3 + }, + "series" : { + "value" : [ 5, 4 ] + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc new file mode 100644 index 0000000000..fb4353203a --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrOutputSchema.avsc @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "input", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrSpec.json new file mode 100644 index 0000000000..a6bde70b8b --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/removrSpec.json @@ -0,0 +1,5 @@ +{ + "rating": { + "quality": "" + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutput.json new file mode 100644 index 0000000000..d73e3a2ce0 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutput.json @@ -0,0 +1,8 @@ +[ { + "SecondaryRatings" : { + "quality" : { + "Value" : 3, + "RatingRange" : 3 + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc new file mode 100644 index 0000000000..7b1b0547ab --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrOutputSchema.avsc @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "SecondaryRatings", "type": { + "name": "SecondaryRatingsRecord", "type": "record", "fields":[ + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "Value", "type": ["null", "int"]}, + {"name": "RatingRange", "type": ["null", "int"]} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrSpec.json new file mode 100644 index 0000000000..d292cdd7e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/shiftrSpec.json @@ -0,0 +1,10 @@ +{ + "rating": { + "primary": { + "value": "SecondaryRatings.quality.RatingRange" + }, + "quality": { + "value": "SecondaryRatings.quality.Value" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutput.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutput.json new file mode 100644 index 0000000000..a45e432fc8 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutput.json @@ -0,0 +1,13 @@ +[ { + "rating" : { + "primary" : { + "value" : 3 + }, + "quality" : { + "value" : 3 + }, + "series" : { + "value" : [ 5, 4 ] + } + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc new file mode 100644 index 0000000000..d3052a3b0f --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestJoltTransformRecord/sortrOutputSchema.avsc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"namespace": "example.avro", + "type": "record", + "name": "output", + "fields": [ + {"name": "rating", "type": { + "name": "ratingRecord", "type": "record", "fields":[ + { "name": "primary", "type": { + "name": "primaryRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "quality", "type": { + "name": "qualityRecord", "type": "record", "fields":[ + {"name": "value", "type": ["null", "int"]} + ] + } + }, + { "name": "series", "type": { + "name": "seriesRecord", "type": "record", "fields":[ + {"name": "value", "type": { "type": "array", "items": "int" }} + ] + } + } + ] + } + } + ] +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar new file mode 100644 index 0000000000..b738658f1c Binary files /dev/null and b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/TestCustomJoltTransform.jar differ diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/cardrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/cardrSpec.json new file mode 100644 index 0000000000..fc77f182a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/cardrSpec.json @@ -0,0 +1,7 @@ +{ + "rating" : { + "series" : { + "value" : "ONE" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/chainrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/chainrSpec.json new file mode 100644 index 0000000000..7884abfe38 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/chainrSpec.json @@ -0,0 +1,29 @@ +[ + { + "operation": "shift", + "spec": { + "rating": { + "primary": { + "value": "Rating", + "max": "RatingRange" + }, + "*": { + "max": "SecondaryRatings.&1.Range", + "value": "SecondaryRatings.&1.Value", + "$": "SecondaryRatings.&1.Id" + } + } + } + }, + { + "operation": "default", + "spec": { + "Range": 5, + "SecondaryRatings": { + "*": { + "Range": 5 + } + } + } + } +] diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/defaultrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/defaultrSpec.json new file mode 100644 index 0000000000..2f2ea9f662 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/defaultrSpec.json @@ -0,0 +1,10 @@ + { + "RatingRange" : 5, + "rating": { + "*": { + "MaxLabel": "High", + "MinLabel": "Low", + "DisplayType": "NORMAL" + } + } + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefaultSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefaultSpec.json new file mode 100644 index 0000000000..a351a98c74 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefaultSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "primary?": { + "+value": 0 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefineSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefineSpec.json new file mode 100644 index 0000000000..e4d9ec8a33 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierDefineSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "question": { + "value": 0 + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierOverwriteSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierOverwriteSpec.json new file mode 100644 index 0000000000..33b5e2a4e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/modifierOverwriteSpec.json @@ -0,0 +1,7 @@ +{ + "rating": { + "series": { + "series_first": "=firstElement(@(1,value))" + } + } +} diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/removrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/removrSpec.json new file mode 100644 index 0000000000..a6bde70b8b --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/removrSpec.json @@ -0,0 +1,5 @@ +{ + "rating": { + "quality": "" + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/shiftrSpec.json b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/shiftrSpec.json new file mode 100644 index 0000000000..d292cdd7e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/nifi-jolt-record-processors/src/test/resources/TestTransformFactory/shiftrSpec.json @@ -0,0 +1,10 @@ +{ + "rating": { + "primary": { + "value": "SecondaryRatings.quality.RatingRange" + }, + "quality": { + "value": "SecondaryRatings.quality.Value" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-jolt-record-bundle/pom.xml b/nifi-nar-bundles/nifi-jolt-record-bundle/pom.xml new file mode 100644 index 0000000000..3a7c4fae36 --- /dev/null +++ b/nifi-nar-bundles/nifi-jolt-record-bundle/pom.xml @@ -0,0 +1,72 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 1.8.0-SNAPSHOT + + + nifi-jolt-record-bundle + 1.8.0-SNAPSHOT + pom + + + nifi-jolt-record-processors + nifi-jolt-record-nar + + + + 2.9.5 + 2.2.0 + 0.1.1 + 2.26 + + + + + + org.apache.nifi + nifi-jolt-record-processors + 1.8.0-SNAPSHOT + + + org.apache.nifi + nifi-utils + 1.8.0-SNAPSHOT + + + org.apache.nifi + nifi-record-serialization-service-api + 1.8.0-SNAPSHOT + + + com.bazaarvoice.jolt + jolt-core + ${jolt.version} + + + com.bazaarvoice.jolt + json-utils + ${jolt.version} + + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 309be2afe4..d3b12d9dd9 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -93,6 +93,7 @@ nifi-spark-bundle nifi-atlas-bundle nifi-druid-bundle + nifi-jolt-record-bundle