From 3906d4e1d215f41cdf075ac23a63e9e740391b33 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 22 Jun 2017 14:01:32 -0400 Subject: [PATCH] NIFI-1763: Initial implementation of ConfluentSchemaRegistry. NIFI-1763: Fixed bug where the Confluent Schema Registry Schema Access Writer was not being created Signed-off-by: Yolanda M. Davis This closes #1938 --- nifi-assembly/pom.xml | 5 + .../nifi-confluent-platform-nar/pom.xml | 36 +++ .../src/main/resource/META-INF/LICENSE | 262 ++++++++++++++++++ .../src/main/resource/META-INF/NOTICE | 125 +++++++++ .../pom.xml | 69 +++++ .../ConfluentSchemaRegistry.java | 207 ++++++++++++++ .../schemaregistry/MultipleURLValidator.java | 51 ++++ .../client/CachingSchemaRegistryClient.java | 129 +++++++++ .../client/RestSchemaRegistryClient.java | 181 ++++++++++++ .../client/SchemaRegistryClient.java | 30 ++ ...g.apache.nifi.controller.ControllerService | 16 ++ .../nifi-confluent-platform-bundle/pom.xml | 27 ++ .../nifi/schema/access/SchemaAccessUtils.java | 61 ++-- .../ConfluentSchemaRegistryStrategy.java | 74 +++++ .../access/ConfluentSchemaRegistryWriter.java | 77 +++++ nifi-nar-bundles/nifi-registry-bundle/pom.xml | 9 - .../HortonworksSchemaRegistry.java | 6 - .../java/org/apache/nifi/csv/CSVUtils.java | 7 +- .../SchemaRegistryRecordSetWriter.java | 12 +- .../serialization/SchemaRegistryService.java | 3 +- nifi-nar-bundles/pom.xml | 1 + pom.xml | 13 +- 22 files changed, 1334 insertions(+), 67 deletions(-) create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/LICENSE create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/MultipleURLValidator.java create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index e1ac6cbcc6..7252697d71 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -186,6 +186,11 @@ nifi-kafka-0-10-nar nar + + org.apache.nifi + nifi-confluent-platform-nar + nar + org.apache.nifi nifi-http-context-map-nar diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/pom.xml new file mode 100644 index 0000000000..b8a9b74fb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-confluent-platform-bundle + 1.4.0-SNAPSHOT + + + nifi-confluent-platform-nar + nar + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-confluent-schema-registry-service + + + diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/LICENSE b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/LICENSE new file mode 100644 index 0000000000..88113d5ca4 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/LICENSE @@ -0,0 +1,262 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + + + nifi-confluent-platform-bundle includes subcomponents with separate copyright notices and + license terms. Your use of these subcomponents is subject to the terms + and conditions of the following licenses: + + The binary distribution of this product bundles 'ParaNamer' and 'Paranamer Core' + which is available under a BSD style license. + + Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions + are met: + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + 3. Neither the name of the copyright holders nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + THE POSSIBILITY OF SUCH DAMAGE. + + + The binary distribution of this product bundles 'Bouncy Castle JDK 1.5' + under an MIT style license. + + Copyright (c) 2000 - 2015 The Legion of the Bouncy Castle Inc. (http://www.bouncycastle.org) + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/NOTICE b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/NOTICE new file mode 100644 index 0000000000..515c56997f --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-platform-nar/src/main/resource/META-INF/NOTICE @@ -0,0 +1,125 @@ +nifi-confluent-platform-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). +=========================================== +Apache Software License v2 +=========================================== + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Jackson JSON processor + The following NOTICE information applies: + # Jackson JSON processor + + Jackson is a high-performance, Free/Open Source JSON processing library. + It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has + been in development since 2007. + It is currently developed by a community of developers, as well as supported + commercially by FasterXML.com. + + ## Licensing + + Jackson core and extension components may licensed under different licenses. + To find the details that apply to this artifact see the accompanying LICENSE file. + For more information, including possible other licensing options, contact + FasterXML.com (http://fasterxml.com). + + ## Credits + + A list of contributors may be found from CREDITS file, which is included + in some artifacts (usually source distributions); but is always available + from the source code management (SCM) system project uses. + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2017 The Apache Software Foundation + + (ASLv2) Snappy Java + The following NOTICE information applies: + This product includes software developed by Google + Snappy: http://code.google.com/p/snappy/ (New BSD License) + + This product includes software developed by Apache + PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/ + (Apache 2.0 license) + + This library containd statically linked libstdc++. This inclusion is allowed by + "GCC RUntime Library Exception" + http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html + + (ASLv2) Apache Commons Compress + The following NOTICE information applies: + Apache Commons Compress + Copyright 2002-2017 The Apache Software Foundation + + The files in the package org.apache.commons.compress.archivers.sevenz + were derived from the LZMA SDK, version 9.20 (C/ and CPP/7zip/), + which has been placed in the public domain: + + "LZMA SDK is placed in the public domain." (http://www.7-zip.org/sdk.html) + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons Codec + The following NOTICE information applies: + Apache Commons Codec + Copyright 2002-2014 The Apache Software Foundation + + src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java + contains test data from http://aspell.net/test/orig/batch0.tab. + Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org) + + =============================================================================== + + The content of package org.apache.commons.codec.language.bm has been translated + from the original php source code available at http://stevemorse.org/phoneticinfo.htm + with permission from the original authors. + Original source copyright: + Copyright (c) 2008 Alexander Beider & Stephen P. Morse. + + (ASLv2) Jettison + The following NOTICE information applies: + Copyright 2006 Envoi Solutions LLC + + + +***************** +Public Domain +***************** + +The following binary components are provided to the 'Public Domain'. See project link for details. + + (Public Domain) XZ for Java (org.tukaani:xz:jar:1.5 - http://tukaani.org/xz/java.html + + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) jersey-core-client (org.glassfish.jersey.core:jersey-client:jar:2.19 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-client (com.sun.jersey:jersey-client:jar:1.19 - https://jersey.java.net/jersey-client/) + (CDDL 1.1) (GPL2 w/ CPE) jersey-json (com.sun.jersey:jersey-json:jar:1.19 - https://jersey.java.net/jersey-json/) + (CDDL 1.1) (GPL2 w/ CPE) Old JAXB Runtime (com.sun.xml.bind:jaxb-impl:jar:2.2.3-1 - http://jaxb.java.net/) + (CDDL 1.1) (GPL2 w/ CPE) Java Architecture For XML Binding (javax.xml.bind:jaxb-api:jar:2.2.2 - https://jaxb.dev.java.net/) + + +************************ +Common Development and Distribution License 1.0 +************************ +The following binary components are provided under the Common Development and Distribution License 1.0. See project link for details. + + (CDDL 1.0) JSR311 API (javax.ws.rs:jsr311-api:jar:1.1.1 - https://jsr311.dev.java.net) + (CDDL 1.0) (GPL3) Streaming API For XML (javax.xml.stream:stax-api:jar:1.0-2 - no url provided) + (CDDL 1.0) JavaBeans Activation Framework (JAF) (javax.activation:activation:jar:1.1 - http://java.sun.com/products/javabeans/jaf/index.jsp) + diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml new file mode 100644 index 0000000000..69f568f12e --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-confluent-platform-bundle + 1.4.0-SNAPSHOT + + + nifi-confluent-schema-registry-service + jar + + + + org.apache.nifi + nifi-schema-registry-service-api + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-avro-record-utils + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-ssl-context-service-api + + + + com.sun.jersey + jersey-client + + + org.apache.nifi + nifi-web-utils + + + + junit + junit + test + + + + + + confluent + http://packages.confluent.io/maven/ + + + diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java new file mode 100644 index 0000000000..070be75940 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.confluent.schemaregistry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient; +import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient; +import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; + + + +@Tags({"schema", "registry", "confluent", "avro", "kafka"}) +@CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema " + + "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema " + + "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.") +public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + + private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, + SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + + + static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder() + .name("url") + .displayName("Schema Registry URLs") + .description("A comma-separated list of URLs of the Schema Registry to interact with") + .expressionLanguageSupported(true) + .defaultValue("http://localhost:8081") + .required(true) + .addValidator(new MultipleURLValidator()) + .build(); + + static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("ssl-context") + .displayName("SSL Context Service") + .description("Specifies the SSL Context Service to use for interacting with the Confluent Schema Registry") + .identifiesControllerService(SSLContextService.class) + .required(false) + .build(); + + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached from the Schema Registry") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder() + .name("cache-expiration") + .displayName("Cache Expiration") + .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a " + + "cached version of a schema will no longer be used, and the service will have to communicate with the " + + "Schema Registry again in order to obtain the schema.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .required(true) + .build(); + + static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("timeout") + .displayName("Communications Timeout") + .description("Specifies how long to wait to receive data from the Schema Registry before considering the communications a failure") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .required(true) + .build(); + + private volatile SchemaRegistryClient client; + + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(SCHEMA_REGISTRY_URLS); + properties.add(SSL_CONTEXT); + properties.add(TIMEOUT); + properties.add(CACHE_SIZE); + properties.add(CACHE_EXPIRATION); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final List baseUrls = getBaseURLs(context); + final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + + final SSLContext sslContext; + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + if (sslContextService == null) { + sslContext = null; + } else { + sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); + } + + final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext); + + final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); + final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue(); + + client = new CachingSchemaRegistryClient(restClient, cacheSize, cacheExpiration); + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet(); + if (sslContextSet) { + final List baseUrls = getBaseURLs(validationContext); + final List insecure = baseUrls.stream() + .filter(url -> !url.startsWith("https")) + .collect(Collectors.toList()); + + if (!insecure.isEmpty()) { + return Collections.singleton(new ValidationResult.Builder() + .subject(SCHEMA_REGISTRY_URLS.getDisplayName()) + .input(insecure.get(0)) + .valid(false) + .explanation("When SSL Context is configured, all Schema Registry URL's must use HTTPS, not HTTP") + .build()); + } + } + + return Collections.emptyList(); + } + + private List getBaseURLs(final PropertyContext context) { + final String urls = context.getProperty(SCHEMA_REGISTRY_URLS).evaluateAttributeExpressions().getValue(); + final List baseUrls = Stream.of(urls.split(",")) + .map(url -> url.trim()) + .collect(Collectors.toList()); + + return baseUrls; + } + + @Override + public String retrieveSchemaText(final String schemaName) throws IOException, SchemaNotFoundException { + final RecordSchema schema = retrieveSchema(schemaName); + return schema.getSchemaText().get(); + } + + @Override + public String retrieveSchemaText(final long schemaId, final int version) throws IOException, SchemaNotFoundException { + final RecordSchema schema = retrieveSchema(schemaId, version); + return schema.getSchemaText().get(); + } + + @Override + public RecordSchema retrieveSchema(final String schemaName) throws IOException, SchemaNotFoundException { + final RecordSchema schema = client.getSchema(schemaName); + return schema; + } + + @Override + public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, SchemaNotFoundException { + final RecordSchema schema = client.getSchema((int) schemaId); + return schema; + } + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/MultipleURLValidator.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/MultipleURLValidator.java new file mode 100644 index 0000000000..01f8b8466f --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/MultipleURLValidator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.confluent.schemaregistry; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.util.StandardValidators; + +public class MultipleURLValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String[] splits = input.split(","); + if (splits.length == 0) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("At least one URL must be specified") + .build(); + } + + for (final String split : splits) { + final String url = split.trim(); + + final ValidationResult result = StandardValidators.URL_VALIDATOR.validate(subject, url, context); + if (result != null && !result.isValid()) { + return result; + } + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java new file mode 100644 index 0000000000..d82befe047 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.confluent.schemaregistry.client; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; + + +public class CachingSchemaRegistryClient implements SchemaRegistryClient { + private final SchemaRegistryClient client; + private final long expirationNanos; + + private final Map nameCache; + private final Map idCache; + + + public CachingSchemaRegistryClient(final SchemaRegistryClient toWrap, final int cacheSize, final long expirationNanos) { + this.client = toWrap; + this.expirationNanos = expirationNanos; + + nameCache = new Cache<>(cacheSize); + idCache = new Cache<>(cacheSize); + } + + @Override + public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { + RecordSchema schema = getFromCache(nameCache, schemaName); + if (schema != null) { + return schema; + } + + schema = client.getSchema(schemaName); + + synchronized (nameCache) { + nameCache.put(schemaName, new CachedRecordSchema(schema)); + } + + return schema; + } + + @Override + public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException { + RecordSchema schema = getFromCache(idCache, schemaId); + if (schema != null) { + return schema; + } + + schema = client.getSchema(schemaId); + + synchronized (idCache) { + idCache.put(schemaId, new CachedRecordSchema(schema)); + } + + return schema; + } + + private RecordSchema getFromCache(final Map cache, final Object key) { + final CachedRecordSchema cachedSchema; + synchronized (cache) { + cachedSchema = cache.get(key); + } + + if (cachedSchema == null) { + return null; + } + + if (cachedSchema.isOlderThan(System.nanoTime() - expirationNanos)) { + return null; + } + + return cachedSchema.getSchema(); + } + + + private static class Cache extends LinkedHashMap { + private final int cacheSize; + + public Cache(final int cacheSize) { + this.cacheSize = cacheSize; + } + + @Override + protected boolean removeEldestEntry(final Map.Entry eldest) { + return size() >= cacheSize; + } + } + + + private static class CachedRecordSchema { + private final RecordSchema schema; + private final long cachedTimestamp; + + public CachedRecordSchema(final RecordSchema schema) { + this(schema, System.nanoTime()); + } + + public CachedRecordSchema(final RecordSchema schema, final long timestamp) { + this.schema = schema; + this.cachedTimestamp = timestamp; + } + + public RecordSchema getSchema() { + return schema; + } + + public boolean isOlderThan(final long timestamp) { + return cachedTimestamp < timestamp; + } + } +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java new file mode 100644 index 0000000000..a19a90db57 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.confluent.schemaregistry.client; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.web.util.WebUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; + + +/** + *

+ * A Client for interacting with Confluent Schema Registry. We make use of Jersey Client to interact with the + * Confluent Schema Registry REST API because the provided schema registry client does not provide a way to + * use HTTPS for interacting with the schema registry (it assumes that system properties will be used, instead of + * an SSLContext) and also does not allow configuration of (or use) timeouts. As a result, if the Schema Registry + * crashed or was shut down, NiFi threads could be stuck indefinitely until NiFi is restarted. To avoid this, + * we make use of Jersey Client and set timeouts appropriately. + *

+ */ +public class RestSchemaRegistryClient implements SchemaRegistryClient { + private final List baseUrls; + private final Client client; + + private static final String SUBJECT_FIELD_NAME = "subject"; + private static final String VERSION_FIELD_NAME = "version"; + private static final String ID_FIELD_NAME = "id"; + private static final String SCHEMA_TEXT_FIELD_NAME = "schema"; + + private final ConcurrentMap schemaNameToIdentifierMap = new ConcurrentHashMap<>(); + private final ConcurrentMap schemaIdentifierToNameMap = new ConcurrentHashMap<>(); + + + public RestSchemaRegistryClient(final List baseUrls, final int timeoutMillis, final SSLContext sslContext) { + this.baseUrls = new ArrayList<>(baseUrls); + + final ClientConfig clientConfig = new DefaultClientConfig(); + clientConfig.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + clientConfig.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + client = WebUtils.createClient(clientConfig, sslContext); + } + + + @Override + public RecordSchema getSchema(final String schemaName) throws IOException, SchemaNotFoundException { + final String pathSuffix = getSubjectPath(schemaName); + final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + schemaName); + + final RecordSchema recordSchema = createRecordSchema(responseJson); + return recordSchema; + } + + + @Override + public RecordSchema getSchema(final int schemaId) throws IOException, SchemaNotFoundException { + // The Confluent Schema Registry's REST API does not provide us with the 'subject' (name) of a Schema given the ID. + // It will provide us only the text of the Schema itself. Therefore, in order to determine the name (which is required for + // a SchemaIdentifier), we must obtain a list of all Schema names, and then request each and every one of the schemas to determine + // if the ID requested matches the Schema's ID. + // To make this more efficient, we will cache a mapping of Schema Name to identifier, so that we can look this up more efficiently. + + // Check if we have cached the Identifier to Name mapping + final String schemaName = schemaIdentifierToNameMap.get(schemaId); + if (schemaName != null) { + return getSchema(schemaName); + } + + final String schemaDescription = "identifier " + schemaId; + final JsonNode schemaNameArray = fetchJsonResponse("/subjects", schemaDescription); + if (!schemaNameArray.isArray()) { + throw new IOException("When determining Subjects that are available, expected a JSON Array but did not receive a valid response"); + } + + final ArrayNode arrayNode = (ArrayNode) schemaNameArray; + for (final JsonNode node : arrayNode) { + final String nodeName = node.getTextValue(); + + final String schemaPath = getSubjectPath(nodeName); + final JsonNode schemaNode = fetchJsonResponse(schemaPath, schemaDescription); + + final int id = schemaNode.get(ID_FIELD_NAME).asInt(); + schemaNameToIdentifierMap.put(nodeName, id); + schemaIdentifierToNameMap.put(id, nodeName); + + if (id == schemaId) { + return createRecordSchema(schemaNode); + } + } + + throw new SchemaNotFoundException("Could not find a schema with identifier " + schemaId); + } + + private RecordSchema createRecordSchema(final JsonNode schemaNode) throws SchemaNotFoundException { + final String subject = schemaNode.get(SUBJECT_FIELD_NAME).getTextValue(); + final int version = schemaNode.get(VERSION_FIELD_NAME).asInt(); + final int id = schemaNode.get(ID_FIELD_NAME).asInt(); + final String schemaText = schemaNode.get(SCHEMA_TEXT_FIELD_NAME).getTextValue(); + + try { + final Schema avroSchema = new Schema.Parser().parse(schemaText); + final SchemaIdentifier schemaId = SchemaIdentifier.of(subject, id, version); + + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); + return recordSchema; + } catch (final SchemaParseException spe) { + throw new SchemaNotFoundException("Obtained Schema with id " + id + " and name " + subject + + " from Confluent Schema Registry but the Schema Text that was returned is not a valid Avro Schema"); + } + } + + private String getSubjectPath(final String schemaName) throws UnsupportedEncodingException { + return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + "/versions/latest"; + } + + private JsonNode fetchJsonResponse(final String pathSuffix, final String schemaDescription) throws SchemaNotFoundException, IOException { + String errorMessage = null; + for (final String baseUrl : baseUrls) { + final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix; + final String trimmedBase = baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + final String url = trimmedBase + path; + + final WebResource.Builder builder = client.resource(url).accept(MediaType.APPLICATION_JSON); + final ClientResponse response = builder.get(ClientResponse.class); + final int responseCode = response.getStatus(); + + if (responseCode == Response.Status.OK.getStatusCode()) { + final JsonNode responseJson = response.getEntity(JsonNode.class); + return responseJson; + } + + if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { + throw new SchemaNotFoundException("Could not find Schema with " + schemaDescription + " from the Confluent Schema Registry located at " + baseUrl); + } + + if (errorMessage == null) { + errorMessage = response.getEntity(String.class); + } + } + + throw new IOException("Failed to retrieve Schema with " + schemaDescription + " from any of the Confluent Schema Registry URL's provided; failure response message: " + errorMessage); + } + +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java new file mode 100644 index 0000000000..3c8c0cb5aa --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.confluent.schemaregistry.client; + +import java.io.IOException; + +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; + +public interface SchemaRegistryClient { + + RecordSchema getSchema(String schemaName) throws IOException, SchemaNotFoundException; + + RecordSchema getSchema(int schemaId) throws IOException, SchemaNotFoundException; +} diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..65c5da8d08 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.confluent.schemaregistry.ConfluentSchemaRegistry \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml b/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml new file mode 100644 index 0000000000..db57b04ca6 --- /dev/null +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/pom.xml @@ -0,0 +1,27 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.4.0-SNAPSHOT + + nifi-confluent-platform-bundle + pom + A bundle of components that interact with the Confluent Platform + + + nifi-confluent-schema-registry-service + nifi-confluent-platform-nar + + diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index b335b1156a..4f0f945ea0 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -16,20 +16,19 @@ */ package org.apache.nifi.schema.access; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + import org.apache.nifi.avro.AvroSchemaValidator; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - public class SchemaAccessUtils { public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property", @@ -45,10 +44,13 @@ public class SchemaAccessUtils { "The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'"); public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema", "The schema used to write records will be the same schema that was given to the Record when the Record was created."); + public static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Content-Encoded Schema Reference", + "The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single " + + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " + + "This is based on version 3.2.x of the Confluent Schema Registry."); - - public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() + public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder() .name("schema-registry") .displayName("Schema Registry") .description("Specifies the Controller Service to use for the Schema Registry") @@ -56,11 +58,11 @@ public class SchemaAccessUtils { .required(false) .build(); - public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder() + public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder() .name("schema-access-strategy") .displayName("Schema Access Strategy") .description("Specifies how to obtain the schema that is to be used for interpreting the data.") - .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA) + .allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA) .defaultValue(SCHEMA_NAME_PROPERTY.getValue()) .required(true) .build(); @@ -115,42 +117,11 @@ public class SchemaAccessUtils { private static boolean isSchemaRegistryRequired(final String schemaAccessValue) { return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue) - || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue); + || HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue) || CONFLUENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue); } - public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) { - if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { - return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); - } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { - return new InheritSchemaFromRecord(); - } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { - return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); - } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { - return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry); - } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { - return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); - } - - return null; - } - - public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { - if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { - return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); - } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { - return new InheritSchemaFromRecord(); - } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { - return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT)); - } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { - return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry); - } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { - return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); - } - - return null; - } - - public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + + public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { @@ -161,6 +132,8 @@ public class SchemaAccessUtils { return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry); } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { + return new ConfluentSchemaRegistryStrategy(schemaRegistry); } return null; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java new file mode 100644 index 0000000000..f892ab882e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.StreamUtils; + +public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy { + private final Set schemaFields; + private final SchemaRegistry schemaRegistry; + + public ConfluentSchemaRegistryStrategy(final SchemaRegistry schemaRegistry) { + this.schemaRegistry = schemaRegistry; + + schemaFields = new HashSet<>(); + schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); + schemaFields.add(SchemaField.SCHEMA_VERSION); + schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); + } + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + final byte[] buffer = new byte[5]; + try { + StreamUtils.fillBuffer(contentStream, buffer); + } catch (final IOException ioe) { + throw new SchemaNotFoundException("Could not read first 5 bytes from stream", ioe); + } + + // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer + // as it is provided at: + // http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format + // The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes + // representing the schema id. + final ByteBuffer bb = ByteBuffer.wrap(buffer); + final int magicByte = bb.get(); + if (magicByte != 0) { + throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. " + + "Expected stream to begin with a Magic Byte of 0 but first byte was " + magicByte); + } + + final int schemaId = bb.getInt(); + return schemaRegistry.retrieveSchema(schemaId, 1); + } + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java new file mode 100644 index 0000000000..3677b9fc0d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schema.access; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + +public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter { + private static final Set requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + + @Override + public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { + final SchemaIdentifier identifier = schema.getIdentifier(); + final long id = identifier.getIdentifier().getAsLong(); + + // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer + // as it is provided at: + // http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format + // The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes + // representing the schema id. + final ByteBuffer bb = ByteBuffer.allocate(5); + bb.put((byte) 0); + bb.putInt((int) id); + + out.write(bb.array()); + } + + @Override + public Map getAttributes(final RecordSchema schema) { + return Collections.emptyMap(); + } + + @Override + public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { + final SchemaIdentifier identifier = schema.getIdentifier(); + final OptionalLong identifierOption = identifier.getIdentifier(); + if (!identifierOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Identifier is not known"); + } + + final OptionalInt versionOption = identifier.getVersion(); + if (!versionOption.isPresent()) { + throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Version is not known"); + } + } + + @Override + public Set getRequiredSchemaFields() { + return requiredSchemaFields; + } + +} diff --git a/nifi-nar-bundles/nifi-registry-bundle/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/pom.xml index 5944f446a9..636e30f649 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-registry-bundle/pom.xml @@ -24,13 +24,4 @@ nifi-registry-service nifi-registry-nar - - - - org.apache.nifi - nifi-registry-processors - 1.2.0-SNAPSHOT - - - diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index 3027e5fc03..ccb54b03b9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -60,12 +60,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme private final ConcurrentMap> schemaVersionByNameCache = new ConcurrentHashMap<>(); private final ConcurrentMap> schemaVersionByKeyCache = new ConcurrentHashMap<>(); - private static final String LOGICAL_TYPE_DATE = "date"; - private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; - private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; - private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; - private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; - private volatile long versionInfoCacheNanos; static final PropertyDescriptor URL = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java index a391a7b30a..14cd60d897 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -63,10 +63,11 @@ public class CSVUtils { .build(); static final PropertyDescriptor SKIP_HEADER_LINE = new PropertyDescriptor.Builder() .name("Skip Header Line") - .description("Specifies whether or not the first line of CSV should be considered a Header and skipped. If the Schema Access Strategy " + .displayName("Treat First Line as Header") + .description("Specifies whether or not the first line of CSV should be considered a Header or should be considered a record. If the Schema Access Strategy " + "indicates that the columns must be defined in the header, then this property will be ignored, since the header must always be " - + "present and won't be processed as a Record. Otherwise, this property should be 'true' if the first non-comment line of CSV " - + "contains header information that needs to be ignored.") + + "present and won't be processed as a Record. Otherwise, if 'true', then the first line of CSV data will not be processed as a record and if 'false'," + + "then the first line will be interpreted as a record.") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .allowableValues("true", "false") diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 0acf6ff951..fb28b17644 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -36,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter; import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter; import org.apache.nifi.schema.access.NopSchemaAccessWriter; @@ -63,6 +64,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", "The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if " + "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data."); + static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Schema Registry Reference", + "The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single " + + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " + + "This will be prepended to each FlowFile. Note that if the schema for a record does not contain the necessary identifier and version, " + + "an Exception will be thrown when attempting to write the data. This is based on the encoding used by version 3.2.x of the Confluent Schema Registry."); static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile."); /** @@ -73,8 +79,6 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder() .name("Schema Write Strategy") .description("Specifies how the schema for a Record should be added to the data.") - .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA) - .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue()) .required(true) .build(); @@ -83,7 +87,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private volatile SchemaAccessWriter schemaAccessWriter; private final List schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList( - SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)); + SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA)); private final List schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList( SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY)); @@ -156,6 +160,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return new HortonworksEncodedSchemaReferenceWriter(); } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { return new HortonworksAttributeSchemaReferenceWriter(); + } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { + return new ConfluentSchemaRegistryWriter(); } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) { return new NopSchemaAccessWriter(); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index ddcfb0ce78..53b030a92a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -51,6 +51,7 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPER import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA; public abstract class SchemaRegistryService extends AbstractControllerService { @@ -59,7 +60,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService { private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); private final List strategyList = Collections.unmodifiableList(Arrays.asList( - SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); + SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)); protected PropertyDescriptor getSchemaAcessStrategyDescriptor() { return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 28ff134adc..06dc5ab4db 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -35,6 +35,7 @@ nifi-kafka-bundle nifi-kite-bundle nifi-solr-bundle + nifi-confluent-platform-bundle nifi-aws-bundle nifi-social-media-bundle nifi-enrich-bundle diff --git a/pom.xml b/pom.xml index a66cbf1080..7d840dc4d6 100644 --- a/pom.xml +++ b/pom.xml @@ -1276,6 +1276,12 @@ 1.4.0-SNAPSHOT nar
+ + org.apache.nifi + nifi-confluent-platform-nar + 1.4.0-SNAPSHOT + nar + org.apache.nifi nifi-kerberos-iaa-providers-nar @@ -1456,7 +1462,7 @@ 1.4.0-SNAPSHOT nar - + org.apache.nifi nifi-properties 1.4.0-SNAPSHOT @@ -1471,6 +1477,11 @@ nifi-security-utils 1.4.0-SNAPSHOT + + org.apache.nifi + nifi-confluent-schema-registry-service + 1.4.0-SNAPSHOT + org.apache.nifi nifi-logging-utils