From 6a1854c9758005a67d5315f31533fdb88ec55b81 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 20 Jan 2017 10:04:48 -0500 Subject: [PATCH] NIFI-3354 Added support for simple AVRO/CSV/JSON transformers that utilize external Schema Added support for simple Key/Value Schema Registry as Controller Service Added support for registering multiple schemas as dynamic properties of Schema Registry Controller Service Added the following 8 processors - ExtractAvroFieldsViaSchemaRegistry - TransformAvroToCSVViaSchemaRegistry - TransformAvroToJsonViaSchemaRegistry - TransformCSVToAvroViaSchemaRegistry - TransformCSVToJsonViaSchemaRegistry - TransformJsonToAvroViaSchemaRegistry - TransformJsonToCSVViaSchemaRegistry - UpdateAttributeWithSchemaViaSchemaRegistry polishing NIFI-3354 Adding support for HDFS Schema Registry, unions and default values in the Avro Schema and NULL columns in the source CSV NIFI-3354 Adding support for logicalTypes per the Avro 1.7.7 spec NIFI-3354 polishing and restructuring CSVUtils NIFI-3354 renamed processors to address PR comment NIFI-3354 addressed latest PR comments - removed HDFS-based ControllerService. It will be migrated into a separate bundle as a true extension. - removed UpdateAttribute. . . processor - added mime.type attribute to all Transform* processors NIFI-3354 added missing L&N entries This closes pr/1436 --- nifi-assembly/pom.xml | 5 + .../nifi-registry-nar/pom.xml | 31 ++ .../src/main/resources/META-INF/LICENSE | 240 ++++++++++++++ .../src/main/resources/META-INF/NOTICE | 66 ++++ .../nifi-registry-processors/pom.xml | 74 +++++ .../processors/AbstractCSVTransformer.java | 57 ++++ .../AbstractContentTransformer.java | 101 ++++++ .../processors/AbstractTransformer.java | 93 ++++++ .../schemaregistry/processors/AvroUtils.java | 67 ++++ .../processors/BaseContentTransformer.java | 51 +++ .../processors/BaseTransformer.java | 189 +++++++++++ .../schemaregistry/processors/CSVUtils.java | 299 ++++++++++++++++++ .../processors/ExtractAvroFields.java | 100 ++++++ .../schemaregistry/processors/JsonUtils.java | 74 +++++ .../processors/RegistryCommon.java | 84 +++++ .../processors/TransformAvroToCSV.java | 57 ++++ .../processors/TransformAvroToJson.java | 46 +++ .../processors/TransformCSVToAvro.java | 80 +++++ .../processors/TransformCSVToJson.java | 80 +++++ .../processors/TransformJsonToAvro.java | 45 +++ .../processors/TransformJsonToCSV.java | 45 +++ .../org.apache.nifi.processor.Processor | 21 ++ .../processors/TransformersTest.java | 188 +++++++++++ .../decimal_logicalType.txt | 1 + .../decimal_logicalType_invalid_scale.txt | 1 + ...l_logicalType_valid_scale_with_default.txt | 1 + .../decimal_logicalType_with_default.txt | 1 + .../expected_ouput_csv/primitive_types.txt | 1 + .../primitive_types_with_matching_default.txt | 1 + .../union_null_last_field_with_default.txt | 1 + .../union_null_middle_field_with_default.txt | 1 + .../expected_ouput_csv/union_with_default.txt | 1 + ...logicalType_invalid_scale_with_default.txt | 16 + ...l_logicalType_valid_scale_with_default.txt | 16 + ...ogicalType_valid_scale_with_no_default.txt | 15 + .../primitive_types_no_defaults.txt | 11 + .../primitive_types_union_with_defaults.txt | 11 + .../primitive_types_with_matching_default.txt | 11 + .../primitive_types_with_mismatch_default.txt | 11 + .../union_and_matching_defaults.txt | 18 ++ .../union_and_mismatch_defaults.txt | 18 ++ .../input_csv/decimal_logicalType.txt | 1 + .../decimal_logicalType_missing_value.txt | 1 + .../resources/input_csv/primitive_types.txt | 1 + .../primitive_types_with_matching_default.txt | 1 + .../union_null_last_field_with_default.txt | 1 + .../union_null_middle_field_with_default.txt | 1 + .../input_csv/union_with_default.txt | 1 + .../input_csv/union_with_missing_value.txt | 1 + .../nifi-registry-service/pom.xml | 48 +++ .../services/SchemaRegistry.java | 46 +++ .../SimpleKeyValueSchemaRegistry.java | 96 ++++++ ...g.apache.nifi.controller.ControllerService | 15 + .../SimpleKeyValueSchemaRegistryTest.java | 70 ++++ nifi-nar-bundles/nifi-registry-bundle/pom.xml | 42 +++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 57 files changed, 2561 insertions(+) create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java create mode 100644 nifi-nar-bundles/nifi-registry-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 2f798fe055..77722bb4e8 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -368,6 +368,11 @@ language governing permissions and limitations under the License. --> nifi-spring-nar nar + + org.apache.nifi + nifi-registry-nar + nar + org.apache.nifi nifi-hive-nar diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml new file mode 100644 index 0000000000..dfdf214b52 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/pom.xml @@ -0,0 +1,31 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-registry-bundle + 1.2.0-SNAPSHOT + + + nifi-registry-nar + nar + + + org.apache.nifi + nifi-registry-processors + + + diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE new file mode 100644 index 0000000000..70db055e37 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/LICENSE @@ -0,0 +1,240 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + APACHE NIFI SUBCOMPONENTS: + +The Apache NiFi project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + + The binary distribution of this product bundles 'Paranamer Core' which is available + under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..73a4e4ea76 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,66 @@ +nifi-registry-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Commons IO + The following NOTICE information applies: + Apache Commons IO + Copyright 2002-2016 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2013 The Apache Software Foundation + + (ASLv2) Apache Commons Compress + The following NOTICE information applies: + Apache Commons Compress + Copyright 2002-2014 The Apache Software Foundation + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml new file mode 100644 index 0000000000..0ea83eedb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-registry-bundle + 1.2.0-SNAPSHOT + + + nifi-registry-processors + jar + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/expected_ouput_csv/* + src/test/resources/input_avro/* + src/test/resources/input_csv/* + + + + + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + org.apache.avro + avro + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.nifi + nifi-registry-service + 1.2.0-SNAPSHOT + + + org.apache.nifi + nifi-mock + test + + + pl.pragmatists + JUnitParams + 1.0.5 + test + + + diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java new file mode 100644 index 0000000000..54497dc3cc --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractCSVTransformer.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors for CSV + * transformations that integrate with Schema Registry (see + * {@link SchemaRegistry}) + */ +abstract class AbstractCSVTransformer extends AbstractContentTransformer { + + static final List BASE_CSV_DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.addAll(BASE_DESCRIPTORS); + descriptors.add(DELIMITER); + BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + protected volatile char delimiter; + + @Override + public List getSupportedPropertyDescriptors() { + return BASE_CSV_DESCRIPTORS; + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java new file mode 100644 index 0000000000..403b52a572 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractContentTransformer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors that integrate with + * Schema Registry (see {@link SchemaRegistry}) + */ +abstract class AbstractContentTransformer extends BaseContentTransformer implements RegistryCommon { + + static final List BASE_DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.add(REGISTRY_SERVICE); + descriptors.add(SCHEMA_NAME); + descriptors.add(SCHEMA_TYPE); + BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + volatile SchemaRegistry schemaRegistryDelegate; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); + } + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { + Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); + return this.transform(in, out, contextProperties, schema); + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} and + * write content (same or different) into the provided {@link OutputStream}. + * Both {@link InputStream} and {@link OutputStream} represent the content + * of the in/out {@link FlowFile} and are both required to NOT be null; + *

+ * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. It can be null, in which case no attributes will be + * added to the resulting {@link FlowFile}. + * + * + * @param in + * {@link InputStream} representing data to be transformed + * @param out + * {@link OutputStream} representing target stream to wrote + * transformed data. Can be null if no output needs to be + * written. + * @param contextProperties + * instance of {@link InvocationContextProperties} + * @param schema + * instance of {@link Schema} + */ + protected abstract Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema); + + /** + * + */ + @Override + protected List getSupportedPropertyDescriptors() { + return BASE_DESCRIPTORS; + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java new file mode 100644 index 0000000000..13dd4a5922 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AbstractTransformer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Base processor for implementing transform-like processors that integrate with + * Schema Registry (see {@link SchemaRegistry}) + */ +abstract class AbstractTransformer extends BaseTransformer implements RegistryCommon { + + static final List BASE_DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.add(REGISTRY_SERVICE); + descriptors.add(SCHEMA_NAME); + BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + volatile SchemaRegistry schemaRegistryDelegate; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class); + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} + * that represent the content of the incoming {@link FlowFile}. + *

+ * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. + * + * + * @param in + * {@link InputStream} representing data to be transformer + * @param contextProperties + * instance of {@link InvocationContextProperties} + * @param schema + * instance of avro {@link Schema} + */ + protected abstract Map transform(InputStream in, InvocationContextProperties contextProperties, Schema schema); + + /** + * + */ + @Override + protected List getSupportedPropertyDescriptors() { + return BASE_DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) { + Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties); + return this.transform(in, contextProperties, schema); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java new file mode 100644 index 0000000000..b967af98c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/AvroUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.flowfile.FlowFile; + +/** + * Various Avro related utility operations relevant to transforming contents of + * the {@link FlowFile} between Avro formats. + */ +class AvroUtils { + + /** + * Reads provided {@link InputStream} into Avro {@link GenericRecord} + * applying provided {@link Schema} returning the resulting GenericRecord. + */ + public static GenericRecord read(InputStream in, Schema schema) { + GenericDatumReader datumReader = new GenericDatumReader<>(schema); + GenericRecord avroRecord = null; + try { + avroRecord = datumReader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + return avroRecord; + } catch (Exception e) { + throw new IllegalStateException("Failed to read AVRO record", e); + } + } + + /** + * Writes provided {@link GenericRecord} into the provided + * {@link OutputStream}. + */ + public static void write(GenericRecord record, OutputStream out) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + DatumWriter writer = new GenericDatumWriter<>(record.getSchema()); + try { + writer.write(record, encoder); + encoder.flush(); + } catch (Exception e) { + throw new IllegalStateException("Failed to write AVRO record", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java new file mode 100644 index 0000000000..12586acd00 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseContentTransformer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.StreamCallback; + +/** + * Base processor which contains common functionality for processors that + * receive {@link FlowFile} and output {@link FlowFile} while also modifying the + * content of the {@link FlowFile} + */ +public abstract class BaseContentTransformer extends BaseTransformer { + + @Override + protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { + AtomicReference> attributeRef = new AtomicReference>(); + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + attributeRef.set(transform(in, out, contextProperties)); + } + }); + if (attributeRef.get() != null) { + flowFile = session.putAllAttributes(flowFile, attributeRef.get()); + } + return flowFile; + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java new file mode 100644 index 0000000000..e1cc98c2a1 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/BaseTransformer.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; + +/** + * Base processor which contains common functionality for processors that + * receive {@link FlowFile} and output {@link FlowFile} and contain only two + * {@link Relationship}s (i.e., success and failure). Every successful execution + * of + * {@link #doTransform(ProcessContext, ProcessSession, FlowFile, InvocationContextProperties)} + * operation will result in transferring {@link FlowFile} to 'success' + * relationship while any exception will result in such file going to 'failure'. + */ +public abstract class BaseTransformer extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Successfully retrieved schema from Schema Registry") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to find a schema are sent to this relationship") + .build(); + + private static final Set BASE_RELATIONSHIPS; + + static { + Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships); + } + + private final Map propertyInstanceValues = new HashMap<>(); + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile != null) { + try { + InvocationContextProperties contextProperties = new InvocationContextProperties(context, flowFile); + flowFile = this.doTransform(context, session, flowFile, contextProperties); + session.transfer(flowFile, REL_SUCCESS); + } catch (Exception e) { + this.getLogger().error("Failed FlowFile processing, routing to failure. Issue: " + e.getMessage(), e); + session.transfer(flowFile, REL_FAILURE); + } + } else { + context.yield(); + } + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + List propertyDescriptors = this.getSupportedPropertyDescriptors(); + for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { + if (!propertyDescriptor.isExpressionLanguageSupported()){ + this.propertyInstanceValues.put(propertyDescriptor, context.getProperty(propertyDescriptor).getValue()); + } + } + } + + /** + * + */ + protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) { + AtomicReference> attributeRef = new AtomicReference>(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + attributeRef.set(transform(in, null, contextProperties)); + } + }); + if (attributeRef.get() != null) { + flowFile = session.putAllAttributes(flowFile, attributeRef.get()); + } + return flowFile; + } + + @Override + public Set getRelationships() { + return BASE_RELATIONSHIPS; + } + + /** + * This operation is designed to allow sub-classes to provide + * implementations that read content of the provided {@link InputStream} and + * write content (same or different) it into the provided + * {@link OutputStream}. Both {@link InputStream} and {@link OutputStream} + * represent the content of the in/out {@link FlowFile}. The + * {@link OutputStream} can be null if no output needs to be written. + *

+ * The returned {@link Map} represents attributes that will be added to the + * outgoing FlowFile. + * + * + * @param in + * {@link InputStream} representing data to be transformed + * @param out + * {@link OutputStream} representing target stream to wrote + * transformed data. Can be null if no output needs to be + * written. + * @param contextProperties + * instance of {@link InvocationContextProperties} + */ + protected abstract Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties); + + /** + * Properties object that gathers the value of the + * {@link PropertyDescriptor} within the context of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} + * invocation. It maintains the knowledge of instance properties vs. + * invocation properties that the values of which are set by evaluating + * expression against the incoming {@link FlowFile}. + */ + public class InvocationContextProperties { + private final Map propertyInvocationValues = new HashMap<>(); + + InvocationContextProperties(ProcessContext context, FlowFile flowFile) { + List propertyDescriptors = BaseTransformer.this.getSupportedPropertyDescriptors(); + for (PropertyDescriptor propertyDescriptor : propertyDescriptors) { + if (propertyDescriptor.isExpressionLanguageSupported()) { + PropertyValue value = context.getProperty(propertyDescriptor) + .evaluateAttributeExpressions(flowFile); + this.propertyInvocationValues.put(propertyDescriptor, value.getValue()); + } + } + } + + /** + * Returns the value of the property within the context of + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)} + * invocation. + */ + public String getPropertyValue(PropertyDescriptor propertyDescriptor, boolean notNull) { + String propertyValue = propertyInstanceValues.containsKey(propertyDescriptor) + ? propertyInstanceValues.get(propertyDescriptor) + : propertyInvocationValues.get(propertyDescriptor); + if (notNull && propertyValue == null) { + throw new IllegalArgumentException("Property '" + propertyDescriptor + "' evaluatd to null"); + } + return propertyValue; + } + + @Override + public String toString() { + return "Instance: " + propertyInstanceValues + "; Invocation: " + propertyInvocationValues; + } + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java new file mode 100644 index 0000000000..bded6fa539 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/CSVUtils.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.StringWriter; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.BooleanNode; +import org.codehaus.jackson.node.DoubleNode; +import org.codehaus.jackson.node.IntNode; +import org.codehaus.jackson.node.LongNode; +import org.codehaus.jackson.node.TextNode; + +/** + * Various CSV related utility operations relevant to transforming contents of + * the {@link FlowFile} between CSV and AVRO formats. + */ +class CSVUtils { + /** + * Provides a {@link Validator} to ensure that provided value is a valid + * character. + */ + public static final Validator CHAR_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Allows special, escaped characters as input, which is then un-escaped and converted to a single character. + // Examples for special characters: \t (or \u0009), \f. + if (input.length() > 1) { + input = StringEscapeUtils.unescapeJava(input); + } + return new ValidationResult.Builder().subject(subject).input(input) + .explanation("Only non-null single characters are supported") + .valid(input.length() == 1 && input.charAt(0) != 0).build(); + } + }; + + public static GenericRecord read(InputStream record, char delimiter, Schema schema, char quoteChar) { + Record avroRecord = new GenericData.Record(schema); + String[] parsedRecord = parseFields(convertInputStreamToString(record), delimiter, quoteChar); + List fields = schema.getFields(); + if (parsedRecord.length != fields.size()) { + throw new IllegalStateException("Incompatible schema. Parsed fields count does not match the count of fields from schema. " + + "Schema: " + schema.toString(true) + "\n Record: " + record); + } + + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + Type type = field.schema().getType(); + updateRecord(field, type, parsedRecord[i], avroRecord); + } + return avroRecord; + } + + /** + * Parses provided record into fields using provided delimiter. The + * 'quoteChar' is used to ensure that if a delimiter char is in quotes it + * will not be parsed into a separate filed. + */ + public static String[] parseFields(String record, char delimiter, char quoteChar) { + List result = new ArrayList(); + int start = 0; + boolean inQuotes = false; + for (int i = 0; i < record.length(); i++) { + if (record.charAt(i) == quoteChar) { + inQuotes = !inQuotes; + } + boolean atLastChar = (i == record.length() - 1); + if (atLastChar) { + if (record.charAt(i) == delimiter) { + //missing last column value, add NULL + result.add(record.substring(start,i)); + result.add(null); + } else { + result.add(record.substring(start)); + } + } else if (record.charAt(i) == delimiter && !inQuotes) { + if (start == i) { + //There is no value, so add NULL to indicated the absence of a value for this field. + result.add(null); + } else { + result.add(record.substring(start, i)); + } + start = i + 1; + } + } + return result.toArray(new String[]{}); + } + + /** + * Writes {@link GenericRecord} as CSV (delimited) record to the + * {@link OutputStream} using provided delimiter. + */ + public static void write(GenericRecord record, char delimiter, OutputStream out) { + List fields = record.getSchema().getFields(); + + String delimiterToUse = ""; + try { + for (Field field : fields) { + out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8)); + Object fieldValue = record.get(field.name()); + if (null == fieldValue) { + out.write(new byte[0]); + } else { + if (Type.BYTES == field.schema().getType()) { + // need to create it from the ByteBuffer it is serialized as. + // need to ensure the type is one of the logical ones we support and if so convert it. + if(!"decimal".contentEquals(field.getProp("logicalType"))){ + throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + + field.getProp("logicalType") + "' that is currently not supported."); + } + + JsonNode rawPrecision = field.getJsonProp("precision"); + if(null == rawPrecision){ + throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); + } + int precision = rawPrecision.asInt(); + JsonNode rawScale = field.getJsonProp("scale"); + int scale = null == rawScale ? 0 : rawScale.asInt(); + + // write out the decimal with the precision and scale. + NumberFormat numberFormat = DecimalFormat.getInstance(); + numberFormat.setGroupingUsed(false); + normalizeNumberFormat(numberFormat, scale, precision); + final String rawValue = new String(((ByteBuffer)fieldValue).array()); + out.write(numberFormat.format(new BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8)); + } else { + out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8)); + } + } + if (delimiterToUse.length() == 0) { + delimiterToUse = String.valueOf(delimiter); + } + } + } catch (IOException e) { + throw new IllegalStateException("Failed to parse AVRO Record", e); + } + } + + /** + * According to the 1.7.7 spec If a logical type is invalid, for example a + * decimal with scale greater than its precision,then implementations should + * ignore the logical type and use the underlying Avro type. + */ + private static void normalizeNumberFormat(NumberFormat numberFormat, int scale, int precision) { + if (scale < precision) { + // write out with the specified precision and scale. + numberFormat.setMaximumIntegerDigits(precision); + numberFormat.setMaximumFractionDigits(scale); + numberFormat.setMinimumFractionDigits(scale); + } + } + + /** + * + */ + private static String convertInputStreamToString(InputStream record) { + StringWriter writer = new StringWriter(); + try { + IOUtils.copy(record, writer, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new IllegalStateException("Failed to read InputStream into String", e); + } + return writer.toString(); + } + + /** + * + */ + private static ByteBuffer encodeLogicalType(final Field field, final String fieldValue) { + String logicalType = field.getProp("logicalType"); + if (!"decimal".contentEquals(logicalType)) { + throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + logicalType + + "' that is currently not supported."); + } + + JsonNode rawPrecision = field.getJsonProp("precision"); + if (null == rawPrecision) { + throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property"); + } + int precision = rawPrecision.asInt(); + JsonNode rawScale = field.getJsonProp("scale"); + int scale = null == rawScale ? 0 : rawScale.asInt(); + + NumberFormat numberFormat = DecimalFormat.getInstance(); + numberFormat.setGroupingUsed(false); + normalizeNumberFormat(numberFormat, scale, precision); + BigDecimal decimal = null == fieldValue ? new BigDecimal(retrieveDefaultFieldValue(field).asText()) : new BigDecimal(fieldValue); + return ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8)); + } + + /** + * + */ + private static JsonNode retrieveDefaultFieldValue(Field field) { + JsonNode jsonNode = field.defaultValue(); + if (null == jsonNode) { + throw new IllegalArgumentException("The field '" + field.name() + "' is NULL and there is no default value supplied in the Avro Schema"); + } + return jsonNode; + } + + /** + * + */ + private static void updateRecord(Field field, Type type, String providedValue, Record avroRecord) { + if (Type.NULL != type) { + Object value; + if (Type.INT == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, IntNode.class).getIntValue() + : Integer.parseInt(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.BOOLEAN == type) { + value = null == providedValue + ? possiblyGetDefaultValue(field, BooleanNode.class).getBooleanValue() + : Boolean.parseBoolean(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.DOUBLE == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() + : Double.parseDouble(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.FLOAT == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue() + : Float.parseFloat(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.LONG == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, LongNode.class).getLongValue() + : Long.parseLong(providedValue); + avroRecord.put(field.name(), value); + } else if (Type.STRING == type) { + value = null == providedValue ? possiblyGetDefaultValue(field, TextNode.class).getTextValue() + : providedValue; + avroRecord.put(field.name(), value); + } else if (Type.BYTES == type) { + value = encodeLogicalType(field, providedValue); + avroRecord.put(field.name(), value); + } else if (Type.UNION == type) { + field.schema().getTypes() + .forEach(schema -> updateRecord(field, schema.getType(), providedValue, avroRecord)); + } else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED == type || Type.MAP == type + || Type.NULL == type || Type.RECORD == type) { + throw new IllegalArgumentException("The field type '" + type + "' is not supported at the moment"); + } else { + avroRecord.put(field.name(), providedValue); + } + } + } + + /** + * Check to see if there is a default value to use, if not will throw + * {@link IllegalArgumentException} + */ + private static JsonNode possiblyGetDefaultValue(Field field, Class expectedDefaultType) { + JsonNode jsonNode = retrieveDefaultFieldValue(field); + if (field.schema().getType() != Type.UNION && !expectedDefaultType.isAssignableFrom(jsonNode.getClass())) { + // since we do not support schema evolution here we need to throw an + // exception here as the data is in error. + throw new IllegalArgumentException("The field '" + field.name() + "' has a default value that " + + "does not match the field type. Field Type is: '" + expectedDefaultType.getName() + "' and the " + + "default value type is: '" + field.defaultValue().toString()); + } + return jsonNode; + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java new file mode 100644 index 0000000000..2ab83c5f37 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/ExtractAvroFields.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; + +@Tags({ "registry", "schema", "avro", "extract", "evaluate" }) +@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile attribute") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to set the extracted field", + description = "The value of the Avro field specified by 'Avro field name' will be extracted and set as " + + "FlowFile attribute under name specified by the value of this property.") +public final class ExtractAvroFields extends AbstractTransformer { + + private static final List DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.addAll(BASE_DESCRIPTORS); + descriptors.add(SCHEMA_TYPE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile Map dynamicProperties; + + /** + * + */ + @Override + protected List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.dynamicProperties = context.getProperties().entrySet().stream() + .filter(p -> p.getKey().isDynamic()) + .collect(Collectors.toMap(p -> p.getKey().getName(), p -> p.getValue())); + } + + /** + * + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .required(false) + .dynamic(true) + .build(); + } + + /** + * + */ + @Override + protected Map transform(InputStream in, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = AvroUtils.read(in, schema); + Map attributes = this.dynamicProperties.entrySet().stream().collect( + Collectors.toMap(dProp -> dProp.getValue(), dProp -> String.valueOf(avroRecord.get(dProp.getKey())))); + return Collections.unmodifiableMap(attributes); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java new file mode 100644 index 0000000000..81c98b38b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/JsonUtils.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.DataInputStream; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.io.JsonEncoder; +import org.apache.nifi.flowfile.FlowFile; + +/** + * Various Json related utility operations relevant to transforming contents of + * the {@link FlowFile} between JSON and AVRO formats. + */ +class JsonUtils { + + /** + * Writes provided {@link GenericRecord} into the provided + * {@link OutputStream} as JSON. + */ + public static void write(GenericRecord record, OutputStream out) { + try { + DatumWriter writer = new GenericDatumWriter(record.getSchema()); + JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), out); + writer.write(record, encoder); + encoder.flush(); + } catch (Exception e) { + throw new IllegalStateException("Failed to read GenericRecord", e); + } + } + + /** + * Reads provided {@link InputStream} as ISON into Avro + * {@link GenericRecord} applying provided {@link Schema} returning the + * resulting GenericRecord. + */ + public static GenericRecord read(InputStream jsonIs, Schema schema) { + DataInputStream din = new DataInputStream(jsonIs); + try { + Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); + DatumReader reader = new GenericDatumReader<>(schema); + return reader.read(null, decoder); + } catch (Exception e) { + throw new IllegalStateException("Failed to parse incoming Json input stream into Avro GenericRecord. " + + "Possible reason: the value may not be a valid JSON or incompatible schema is provided. Schema was '" + + schema.toString(true) + "'.", e); + } + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java new file mode 100644 index 0000000000..3fc15303a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/RegistryCommon.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import org.apache.avro.Schema; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; + +/** + * Strategy that encapsulates common properties and functionalities used by all + * processors that integrate with Schema Registry. + */ +interface RegistryCommon { + + static final String SCHEMA_ATTRIBUTE_NAME = "schema.text"; + + static final PropertyDescriptor REGISTRY_SERVICE = new PropertyDescriptor.Builder() + .name("schema-registry-service") + .displayName("Schema Registry Service") + .description("The Schema Registry Service for serializing/deserializing messages as well as schema retrieval.") + .required(true) + .identifiesControllerService(SchemaRegistry.class) + .build(); + + static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder() + .name("schema-name") + .displayName("Schema Name") + .description("The name of schema.") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor SCHEMA_TYPE = new PropertyDescriptor.Builder() + .name("schema-type") + .displayName("Schema Type") + .description("The type of schema (avro is the the only current supported schema).") + .required(true) + .allowableValues("avro") + .defaultValue("avro") + .expressionLanguageSupported(true) + .build(); + + static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() + .name("csv-delimiter") + .displayName("CSV delimiter") + .description("Delimiter character for CSV records") + .addValidator(CSVUtils.CHAR_VALIDATOR) + .defaultValue(",") + .build(); + + static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder() + .name("csv-quote-character") + .displayName("CSV quote character") + .description("Quote character for CSV values") + .addValidator(CSVUtils.CHAR_VALIDATOR) + .defaultValue("\"") + .build(); + /** + * Utility operation to retrieve and parse {@link Schema} from Schema + * Registry using provided {@link SchemaRegistry}; + */ + static Schema retrieveSchema(SchemaRegistry schemaRegistry, InvocationContextProperties contextProperties) { + String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, true); + String schemaText = schemaRegistry.retrieveSchemaText(schemaName); + return new Schema.Parser().parse(schemaText); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java new file mode 100644 index 0000000000..aa0d4183e5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToCSV.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "avro", "csv", "transform" }) +@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformAvroToCSV extends AbstractCSVTransformer { + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + byte[] buff = null; + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOUtils.copy(in, bos); + buff = bos.toByteArray(); + } catch (Exception e) { + e.printStackTrace(); + } + ByteArrayInputStream is = new ByteArrayInputStream(buff); + GenericRecord avroRecord = AvroUtils.read(is, schema); + CSVUtils.write(avroRecord, this.delimiter, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java new file mode 100644 index 0000000000..ba45563e50 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformAvroToJson.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + + +@Tags({ "registry", "schema", "avro", "json", "transform" }) +@CapabilityDescription("Transforms AVRO content of the Flow File to JSON using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformAvroToJson extends AbstractContentTransformer { + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = AvroUtils.read(in, schema); + JsonUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java new file mode 100644 index 0000000000..f44e440860 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToAvro.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; + +@Tags({ "csv", "avro", "transform", "registry", "schema" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Transforms CSV content of the Flow File to Avro using the schema provided by the Schema Registry Service.") +public final class TransformCSVToAvro extends AbstractCSVTransformer { + + private static final List DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.addAll(BASE_CSV_DESCRIPTORS); + descriptors.add(QUOTE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile char quoteChar; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); + } + + /** + * + */ + @Override + public List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); + AvroUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java new file mode 100644 index 0000000000..2ce9fbee7d --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformCSVToJson.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; + +@Tags({ "csv", "json", "transform", "registry", "schema" }) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Transforms CSV content of the Flow File to JSON using the schema provided by the Schema Registry Service.") +public final class TransformCSVToJson extends AbstractCSVTransformer { + + private static final List DESCRIPTORS; + + static { + List descriptors = new ArrayList(); + descriptors.addAll(BASE_CSV_DESCRIPTORS); + descriptors.add(QUOTE); + DESCRIPTORS = Collections.unmodifiableList(descriptors); + } + + private volatile char quoteChar; + + /** + * + */ + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0); + } + + /** + * + */ + @Override + public List getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar); + JsonUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json"); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java new file mode 100644 index 0000000000..f54a4b5b2b --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToAvro.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "avro", "json", "transform" }) +@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformJsonToAvro extends AbstractContentTransformer { + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = JsonUtils.read(in, schema); + AvroUtils.write(avroRecord, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro"); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java new file mode 100644 index 0000000000..c026570313 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/java/org/apache/nifi/schemaregistry/processors/TransformJsonToCSV.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.attributes.CoreAttributes; + +@Tags({ "registry", "schema", "csv", "json", "transform" }) +@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +public final class TransformJsonToCSV extends AbstractCSVTransformer { + + /** + * + */ + @Override + protected Map transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) { + GenericRecord avroRecord = JsonUtils.read(in, schema); + CSVUtils.write(avroRecord, this.delimiter, out); + return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv"); + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..0bb067e2b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.schemaregistry.processors.TransformJsonToAvro +org.apache.nifi.schemaregistry.processors.TransformAvroToJson +org.apache.nifi.schemaregistry.processors.TransformCSVToAvro +org.apache.nifi.schemaregistry.processors.TransformCSVToJson +org.apache.nifi.schemaregistry.processors.TransformAvroToCSV +org.apache.nifi.schemaregistry.processors.TransformJsonToCSV +org.apache.nifi.schemaregistry.processors.ExtractAvroFields \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java new file mode 100644 index 0000000000..dabbc17a3b --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/java/org/apache/nifi/schemaregistry/processors/TransformersTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.processors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; +import org.junit.runner.RunWith; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + +@RunWith(JUnitParamsRunner.class) +public class TransformersTest { + + private final ClassLoader classLoader = getClass().getClassLoader(); + + @Test + public void validateCSVtoAvroPair() throws Exception { + String data = "John Dow|13|blue"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // CSV -> AVRO -> CSV + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + public void validateCSVtoJsonPair() throws Exception { + String data = "John Dow|13|blue"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // CSV -> JSON -> CSV + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonUtils.write(record, out); + byte[] json = out.toByteArray(); + + assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8)); + + in = new ByteArrayInputStream(json); + record = JsonUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + public void validateJsonToAvroPair() throws Exception { + String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}"; + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\"}, " + + "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}"; + + Schema schema = new Schema.Parser().parse(fooSchemaText); + + // JSON -> AVRO -> JSON + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = JsonUtils.read(in, schema); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + JsonUtils.write(record, out); + byte[] csv = out.toByteArray(); + assertEquals(data, new String(csv, StandardCharsets.UTF_8)); + } + + @Test + @Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt", + "input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt", + "input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt", + "input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt", + "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt", + "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt", + "input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt", + "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt", + "input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"}) + public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception { + + final String data = getResourceAsString(inputCSVFileName); + final String schemaText = getResourceAsString(inputAvroSchema); + final String result = getResourceAsString(expectedOuput); + + Schema schema = new Schema.Parser().parse(schemaText); + + + // CSV -> AVRO -> CSV + + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + GenericRecord record = CSVUtils.read(in, '|', schema, '\"'); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + AvroUtils.write(record, out); + byte[] avro = out.toByteArray(); + + in = new ByteArrayInputStream(avro); + record = AvroUtils.read(in, schema); + out = new ByteArrayOutputStream(); + CSVUtils.write(record, '|', out); + byte[] csv = out.toByteArray(); + assertEquals(result, new String(csv, StandardCharsets.UTF_8)); + + } + + @Test + @Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt", + "input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"}) + public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) { + + try { + final String data = getResourceAsString(inputCSVFileName); + + final String schemaText = getResourceAsString(inputAvroSchema); + + Schema schema = new Schema.Parser().parse(schemaText); + + ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes()); + CSVUtils.read(in, '|', schema, '\"'); + }catch (IOException ioe){ + assertTrue(false); + }catch(IllegalArgumentException iae){ + assertTrue(true); + } + + } + + /** + * Simple wrapper around getting the test resource file that is used by the above test cases + * + * @param fileName - the filename of the file to read + * @return A string that contains the body of the file. + * @throws IOException - if an error occurs reading the file. + */ + private String getResourceAsString(String fileName) throws IOException { + return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath()))); + } + +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt new file mode 100644 index 0000000000..1a53f858bb --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.890 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt new file mode 100644 index 0000000000..9506ad45f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_invalid_scale.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.89 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt new file mode 100644 index 0000000000..2309e719f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt @@ -0,0 +1 @@ +"fake_transactionid"|0.000 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt new file mode 100644 index 0000000000..3a9689c79e --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/decimal_logicalType_with_default.txt @@ -0,0 +1 @@ +"fake_transactionid"|0 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt new file mode 100644 index 0000000000..77f353fcfa --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types.txt @@ -0,0 +1 @@ +"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt new file mode 100644 index 0000000000..095f81e197 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/primitive_types_with_matching_default.txt @@ -0,0 +1 @@ +"default_string"|1234|21474836470|1.7976931348623157E308|true \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt new file mode 100644 index 0000000000..83cbf754a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_last_field_with_default.txt @@ -0,0 +1 @@ +andrew|13| \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt new file mode 100644 index 0000000000..1b03c97bf3 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_null_middle_field_with_default.txt @@ -0,0 +1 @@ +andrew|21474|blue \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt new file mode 100644 index 0000000000..9c7abb59df --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/expected_ouput_csv/union_with_default.txt @@ -0,0 +1 @@ +andrew|13|blue \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt new file mode 100644 index 0000000000..54ba8b16e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_invalid_scale_with_default.txt @@ -0,0 +1,16 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 13, + "default": 0.0 + }] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt new file mode 100644 index 0000000000..8385fb1ad9 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_default.txt @@ -0,0 +1,16 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 3, + "default": 0.0 + }] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt new file mode 100644 index 0000000000..9878590dbb --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/decimal_logicalType_valid_scale_with_no_default.txt @@ -0,0 +1,15 @@ +{ + "name": "trx_table", + "type": "record", + "fields": [ + { + "name": "transactionid", + "type": ["string", "null"] + }, { + "name": "amount", + "type": "bytes", + "logicalType": "decimal", + "precision": 10, + "scale": 3 + }] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt new file mode 100644 index 0000000000..934a53c2b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_no_defaults.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string"}, + {"name":"integer","type":"int"}, + {"name":"long","type":"long"}, + {"name":"double","type":"double"}, + {"name":"boolean","type":"boolean"} + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt new file mode 100644 index 0000000000..abc80caffd --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_union_with_defaults.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":["null","string"],"default":null}, + {"name":"integer","type":["null","int"],"default":null}, + {"name":"long","type":["null","long"],"default":null}, + {"name":"double","type":["null","double"],"default":null}, + {"name":"boolean","type":["null","boolean"],"default":null} + ] + } diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt new file mode 100644 index 0000000000..b3ea95131f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_matching_default.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string","default":"default_string"}, + {"name":"integer","type":"int","default":1234}, + {"name":"long","type":"long","default":21474836470}, + {"name":"double","type":"double","default":1.7976931348623157E308}, + {"name":"boolean","type":"boolean","default":true} + ] + } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt new file mode 100644 index 0000000000..e8f0e285d8 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/primitive_types_with_mismatch_default.txt @@ -0,0 +1,11 @@ +{ + "type":"record", + "name":"basic_primitive_type_check", + "fields":[ + {"name":"string","type":"string","default":1234}, + {"name":"integer","type":"int","default":"mismatch_int"}, + {"name":"long","type":"long","default":"mismatch_long"}, + {"name":"double","type":"double","default":"mismatch_double"}, + {"name":"boolean","type":"boolean","default":"mismatch_boolean"} + ] + } diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt new file mode 100644 index 0000000000..442a3a45bc --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_matching_defaults.txt @@ -0,0 +1,18 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [{ + "name": "name", + "type": "string", + "default": "default_name" + }, { + "name": "favorite_number", + "type": "int", + "default": 21474 + }, { + "name": "favorite_color", + "type": ["null", "string"], + "default": null + }] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt new file mode 100644 index 0000000000..5222074858 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_avro/union_and_mismatch_defaults.txt @@ -0,0 +1,18 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [{ + "name": "name", + "type": "string", + "default": "default_name" + }, { + "name": "favorite_number", + "type": "int", + "default": "mismatched_int_default" + }, { + "name": "favorite_color", + "type": ["null", "string"], + "default": null + }] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt new file mode 100644 index 0000000000..1a53f858bb --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType.txt @@ -0,0 +1 @@ +"fake_transactionid"|11234567.890 \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt new file mode 100644 index 0000000000..1ee2a9b8b0 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/decimal_logicalType_missing_value.txt @@ -0,0 +1 @@ +"fake_transactionid"| \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt new file mode 100644 index 0000000000..77f353fcfa --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types.txt @@ -0,0 +1 @@ +"this is a simple string."|10|21474836470|1.7976931348623157E308|true \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt new file mode 100644 index 0000000000..b60c01bebe --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/primitive_types_with_matching_default.txt @@ -0,0 +1 @@ +"default_string"||21474836470||true \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt new file mode 100644 index 0000000000..83cbf754a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_last_field_with_default.txt @@ -0,0 +1 @@ +andrew|13| \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt new file mode 100644 index 0000000000..5a706ac189 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_null_middle_field_with_default.txt @@ -0,0 +1 @@ +andrew||blue \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt new file mode 100644 index 0000000000..9c7abb59df --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_default.txt @@ -0,0 +1 @@ +andrew|13|blue \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt new file mode 100644 index 0000000000..5a706ac189 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-processors/src/test/resources/input_csv/union_with_missing_value.txt @@ -0,0 +1 @@ +andrew||blue \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml new file mode 100644 index 0000000000..d99dc64598 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml @@ -0,0 +1,48 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-registry-bundle + 1.2.0-SNAPSHOT + + + nifi-registry-service + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-utils + + + org.apache.commons + commons-lang3 + + + commons-io + commons-io + + + junit + junit + test + + + diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java new file mode 100644 index 0000000000..fd5d0c5739 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Properties; + +import org.apache.nifi.controller.ControllerService; + +/** + * Represents {@link ControllerService} strategy to expose internal and/or + * integrate with external Schema Registry + */ +public interface SchemaRegistry extends ControllerService, AutoCloseable { + + public static final String SCHEMA_NAME_ATTR = "schema.name"; + + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry. Will throw + * an runtime exception if schema can not be found. + */ + String retrieveSchemaText(String schemaName); + + /** + * Retrieves and returns the textual representation of the schema based on + * the provided name of the schema available in Schema Registry and optional + * additional attributes. Will throw an runtime exception if schema can not + * be found. + */ + String retrieveSchemaText(String schemaName, Properties attributes); +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java new file mode 100644 index 0000000000..aaedea245a --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistry.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +@Tags({ "schema", "registry", "avro", "json", "csv" }) +@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema " + + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + + "representation of the actual schema.") +public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + + private static final List propertyDescriptors; + + static { + propertyDescriptors = Collections.emptyList(); + } + + private final Map schemaNameToSchemaMap; + + public SimpleKeyValueSchemaRegistry() { + this.schemaNameToSchemaMap = new HashMap<>(); + } + + @OnEnabled + public void enable(ConfigurationContext configuratiponContext) throws InitializationException { + this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream() + .filter(propEntry -> propEntry.getKey().isDynamic()) + .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); + } + + /** + * + */ + @Override + public String retrieveSchemaText(String schemaName) { + if (!this.schemaNameToSchemaMap.containsKey(schemaName)) { + throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + "."); + } else { + return this.schemaNameToSchemaMap.get(schemaName); + } + } + + @Override + public String retrieveSchemaText(String schemaName, Properties attributes) { + throw new UnsupportedOperationException("This version of schema registry does not " + + "support this operation, since schemas are only identofied by name."); + } + + @Override + @OnDisabled + public void close() throws Exception { + this.schemaNameToSchemaMap.clear(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true) + .build(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..1775b765db --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java new file mode 100644 index 0000000000..29179abf3f --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/SimpleKeyValueSchemaRegistryTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry; +import org.junit.Test; + +public class SimpleKeyValueSchemaRegistryTest { + + @Test + public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { + String schemaName = "fooSchema"; + ConfigurationContext configContext = mock(ConfigurationContext.class); + Map properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() + .name(schemaName) + .dynamic(true) + .build(); + String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", " + + "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, " + + "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, " + + "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}"; + PropertyDescriptor barSchema = new PropertyDescriptor.Builder() + .name("barSchema") + .dynamic(false) + .build(); + properties.put(fooSchema, fooSchemaText); + properties.put(barSchema, ""); + when(configContext.getProperties()).thenReturn(properties); + SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry(); + ((SimpleKeyValueSchemaRegistry)delegate).enable(configContext); + + String locatedSchemaText = delegate.retrieveSchemaText(schemaName); + assertEquals(fooSchemaText, locatedSchemaText); + try { + locatedSchemaText = delegate.retrieveSchemaText("barSchema"); + fail(); + } catch (Exception e) { + // ignore + } + delegate.close(); + } + +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml new file mode 100644 index 0000000000..fa6d30f7a9 --- /dev/null +++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml @@ -0,0 +1,42 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.2.0-SNAPSHOT + + nifi-registry-bundle + pom + A bundle of processors that rely on external service to obtain schema. + + + 3.0 + + + + nifi-registry-processors + nifi-registry-service + nifi-registry-nar + + + + + org.apache.nifi + nifi-registry-processors + 1.2.0-SNAPSHOT + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index f0164aff6f..02785d5a9c 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -74,6 +74,7 @@ nifi-websocket-bundle nifi-tcp-bundle nifi-gcp-bundle + nifi-registry-bundle diff --git a/pom.xml b/pom.xml index c968c6f967..a07e7ff0c5 100644 --- a/pom.xml +++ b/pom.xml @@ -1144,6 +1144,12 @@ language governing permissions and limitations under the License. --> 1.2.0-SNAPSHOT nar + + org.apache.nifi + nifi-registry-nar + 1.2.0-SNAPSHOT + nar + org.apache.nifi nifi-mqtt-nar