mirror of https://github.com/apache/nifi.git
NIFI-3354 Added support for simple AVRO/CSV/JSON transformers that utilize external Schema
Added support for simple Key/Value Schema Registry as Controller Service Added support for registering multiple schemas as dynamic properties of Schema Registry Controller Service Added the following 8 processors - ExtractAvroFieldsViaSchemaRegistry - TransformAvroToCSVViaSchemaRegistry - TransformAvroToJsonViaSchemaRegistry - TransformCSVToAvroViaSchemaRegistry - TransformCSVToJsonViaSchemaRegistry - TransformJsonToAvroViaSchemaRegistry - TransformJsonToCSVViaSchemaRegistry - UpdateAttributeWithSchemaViaSchemaRegistry polishing NIFI-3354 Adding support for HDFS Schema Registry, unions and default values in the Avro Schema and NULL columns in the source CSV NIFI-3354 Adding support for logicalTypes per the Avro 1.7.7 spec NIFI-3354 polishing and restructuring CSVUtils NIFI-3354 renamed processors to address PR comment NIFI-3354 addressed latest PR comments - removed HDFS-based ControllerService. It will be migrated into a separate bundle as a true extension. - removed UpdateAttribute. . . processor - added mime.type attribute to all Transform* processors NIFI-3354 added missing L&N entries This closes pr/1436
This commit is contained in:
parent
ded18b94db
commit
6a1854c975
|
@ -368,6 +368,11 @@ language governing permissions and limitations under the License. -->
|
|||
<artifactId>nifi-spring-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-nar</artifactId>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-hive-nar</artifactId>
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2016 Hortoworks, Inc. All rights reserved.
|
||||
|
||||
Hortonworks, Inc. licenses this file to you under the Apache License,
|
||||
Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
the specific language governing permissions and limitations under the License.
|
||||
See the associated NOTICE file for additional information regarding copyright
|
||||
ownership.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-bundle</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-registry-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-processors</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,240 @@
|
|||
|
||||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
APACHE NIFI SUBCOMPONENTS:
|
||||
|
||||
The Apache NiFi project contains subcomponents with separate copyright
|
||||
notices and license terms. Your use of the source code for the these
|
||||
subcomponents is subject to the terms and conditions of the following
|
||||
licenses.
|
||||
|
||||
The binary distribution of this product bundles 'Paranamer Core' which is available
|
||||
under a BSD style license.
|
||||
|
||||
Copyright (c) 2006 Paul Hammant & ThoughtWorks Inc
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions
|
||||
are met:
|
||||
1. Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
2. Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
3. Neither the name of the copyright holders nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
nifi-registry-nar
|
||||
Copyright 2014-2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
===========================================
|
||||
Apache Software License v2
|
||||
===========================================
|
||||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Apache Commons IO
|
||||
The following NOTICE information applies:
|
||||
Apache Commons IO
|
||||
Copyright 2002-2016 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons Lang
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Lang
|
||||
Copyright 2001-2015 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Avro
|
||||
The following NOTICE information applies:
|
||||
Apache Avro
|
||||
Copyright 2009-2013 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Apache Commons Compress
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Compress
|
||||
Copyright 2002-2014 The Apache Software Foundation
|
||||
|
||||
(ASLv2) Snappy Java
|
||||
The following NOTICE information applies:
|
||||
This product includes software developed by Google
|
||||
Snappy: http://code.google.com/p/snappy/ (New BSD License)
|
||||
|
||||
This product includes software developed by Apache
|
||||
PureJavaCrc32C from apache-hadoop-common http://hadoop.apache.org/
|
||||
(Apache 2.0 license)
|
||||
|
||||
This library containd statically linked libstdc++. This inclusion is allowed by
|
||||
"GCC RUntime Library Exception"
|
||||
http://gcc.gnu.org/onlinedocs/libstdc++/manual/license.html
|
||||
|
||||
(ASLv2) Jackson JSON processor
|
||||
The following NOTICE information applies:
|
||||
# Jackson JSON processor
|
||||
|
||||
Jackson is a high-performance, Free/Open Source JSON processing library.
|
||||
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
|
||||
been in development since 2007.
|
||||
It is currently developed by a community of developers, as well as supported
|
||||
commercially by FasterXML.com.
|
||||
|
||||
## Licensing
|
||||
|
||||
Jackson core and extension components may licensed under different licenses.
|
||||
To find the details that apply to this artifact see the accompanying LICENSE file.
|
||||
For more information, including possible other licensing options, contact
|
||||
FasterXML.com (http://fasterxml.com).
|
||||
|
||||
## Credits
|
||||
|
||||
A list of contributors may be found from CREDITS file, which is included
|
||||
in some artifacts (usually source distributions); but is always available
|
||||
from the source code management (SCM) system project uses.
|
|
@ -0,0 +1,74 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc.
|
||||
licenses this file to you under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License. You may
|
||||
obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
the specific language governing permissions and limitations under the License.
|
||||
See the associated NOTICE file for additional information regarding copyright
|
||||
ownership. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-bundle</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-registry-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.children="append">
|
||||
<exclude>src/test/resources/expected_ouput_csv/*</exclude>
|
||||
<exclude>src/test/resources/input_avro/*</exclude>
|
||||
<exclude>src/test/resources/input_csv/*</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.codehaus.jackson</groupId>
|
||||
<artifactId>jackson-mapper-asl</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-service</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>pl.pragmatists</groupId>
|
||||
<artifactId>JUnitParams</artifactId>
|
||||
<version>1.0.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
/**
|
||||
* Base processor for implementing transform-like processors for CSV
|
||||
* transformations that integrate with Schema Registry (see
|
||||
* {@link SchemaRegistry})
|
||||
*/
|
||||
abstract class AbstractCSVTransformer extends AbstractContentTransformer {
|
||||
|
||||
static final List<PropertyDescriptor> BASE_CSV_DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.addAll(BASE_DESCRIPTORS);
|
||||
descriptors.add(DELIMITER);
|
||||
BASE_CSV_DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
protected volatile char delimiter;
|
||||
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return BASE_CSV_DESCRIPTORS;
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
super.onScheduled(context);
|
||||
this.delimiter = context.getProperty(DELIMITER).getValue().charAt(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
/**
|
||||
* Base processor for implementing transform-like processors that integrate with
|
||||
* Schema Registry (see {@link SchemaRegistry})
|
||||
*/
|
||||
abstract class AbstractContentTransformer extends BaseContentTransformer implements RegistryCommon {
|
||||
|
||||
static final List<PropertyDescriptor> BASE_DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.add(REGISTRY_SERVICE);
|
||||
descriptors.add(SCHEMA_NAME);
|
||||
descriptors.add(SCHEMA_TYPE);
|
||||
BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
volatile SchemaRegistry schemaRegistryDelegate;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) {
|
||||
Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
|
||||
return this.transform(in, out, contextProperties, schema);
|
||||
}
|
||||
|
||||
/**
|
||||
* This operation is designed to allow sub-classes to provide
|
||||
* implementations that read content of the provided {@link InputStream} and
|
||||
* write content (same or different) into the provided {@link OutputStream}.
|
||||
* Both {@link InputStream} and {@link OutputStream} represent the content
|
||||
* of the in/out {@link FlowFile} and are both required to NOT be null;
|
||||
* <p>
|
||||
* The returned {@link Map} represents attributes that will be added to the
|
||||
* outgoing FlowFile. It can be null, in which case no attributes will be
|
||||
* added to the resulting {@link FlowFile}.
|
||||
*
|
||||
*
|
||||
* @param in
|
||||
* {@link InputStream} representing data to be transformed
|
||||
* @param out
|
||||
* {@link OutputStream} representing target stream to wrote
|
||||
* transformed data. Can be null if no output needs to be
|
||||
* written.
|
||||
* @param contextProperties
|
||||
* instance of {@link InvocationContextProperties}
|
||||
* @param schema
|
||||
* instance of {@link Schema}
|
||||
*/
|
||||
protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return BASE_DESCRIPTORS;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
/**
|
||||
* Base processor for implementing transform-like processors that integrate with
|
||||
* Schema Registry (see {@link SchemaRegistry})
|
||||
*/
|
||||
abstract class AbstractTransformer extends BaseTransformer implements RegistryCommon {
|
||||
|
||||
static final List<PropertyDescriptor> BASE_DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.add(REGISTRY_SERVICE);
|
||||
descriptors.add(SCHEMA_NAME);
|
||||
BASE_DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
volatile SchemaRegistry schemaRegistryDelegate;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
this.schemaRegistryDelegate = context.getProperty(REGISTRY_SERVICE).asControllerService(SchemaRegistry.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* This operation is designed to allow sub-classes to provide
|
||||
* implementations that read content of the provided {@link InputStream}
|
||||
* that represent the content of the incoming {@link FlowFile}.
|
||||
* <p>
|
||||
* The returned {@link Map} represents attributes that will be added to the
|
||||
* outgoing FlowFile.
|
||||
*
|
||||
*
|
||||
* @param in
|
||||
* {@link InputStream} representing data to be transformer
|
||||
* @param contextProperties
|
||||
* instance of {@link InvocationContextProperties}
|
||||
* @param schema
|
||||
* instance of avro {@link Schema}
|
||||
*/
|
||||
protected abstract Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema);
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return BASE_DESCRIPTORS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties) {
|
||||
Schema schema = RegistryCommon.retrieveSchema(this.schemaRegistryDelegate, contextProperties);
|
||||
return this.transform(in, contextProperties, schema);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.BinaryEncoder;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
||||
/**
|
||||
* Various Avro related utility operations relevant to transforming contents of
|
||||
* the {@link FlowFile} between Avro formats.
|
||||
*/
|
||||
class AvroUtils {
|
||||
|
||||
/**
|
||||
* Reads provided {@link InputStream} into Avro {@link GenericRecord}
|
||||
* applying provided {@link Schema} returning the resulting GenericRecord.
|
||||
*/
|
||||
public static GenericRecord read(InputStream in, Schema schema) {
|
||||
GenericDatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
|
||||
GenericRecord avroRecord = null;
|
||||
try {
|
||||
avroRecord = datumReader.read(null, DecoderFactory.get().binaryDecoder(in, null));
|
||||
return avroRecord;
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to read AVRO record", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes provided {@link GenericRecord} into the provided
|
||||
* {@link OutputStream}.
|
||||
*/
|
||||
public static void write(GenericRecord record, OutputStream out) {
|
||||
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
|
||||
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(record.getSchema());
|
||||
try {
|
||||
writer.write(record, encoder);
|
||||
encoder.flush();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to write AVRO record", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
|
||||
/**
|
||||
* Base processor which contains common functionality for processors that
|
||||
* receive {@link FlowFile} and output {@link FlowFile} while also modifying the
|
||||
* content of the {@link FlowFile}
|
||||
*/
|
||||
public abstract class BaseContentTransformer extends BaseTransformer {
|
||||
|
||||
@Override
|
||||
protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) {
|
||||
AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>();
|
||||
flowFile = session.write(flowFile, new StreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
attributeRef.set(transform(in, out, contextProperties));
|
||||
}
|
||||
});
|
||||
if (attributeRef.get() != null) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributeRef.get());
|
||||
}
|
||||
return flowFile;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,189 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
import org.apache.nifi.processor.Processor;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
|
||||
/**
|
||||
* Base processor which contains common functionality for processors that
|
||||
* receive {@link FlowFile} and output {@link FlowFile} and contain only two
|
||||
* {@link Relationship}s (i.e., success and failure). Every successful execution
|
||||
* of
|
||||
* {@link #doTransform(ProcessContext, ProcessSession, FlowFile, InvocationContextProperties)}
|
||||
* operation will result in transferring {@link FlowFile} to 'success'
|
||||
* relationship while any exception will result in such file going to 'failure'.
|
||||
*/
|
||||
public abstract class BaseTransformer extends AbstractProcessor {
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("Successfully retrieved schema from Schema Registry")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFiles that failed to find a schema are sent to this relationship")
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> BASE_RELATIONSHIPS;
|
||||
|
||||
static {
|
||||
Set<Relationship> relationships = new HashSet<>();
|
||||
relationships.add(REL_SUCCESS);
|
||||
relationships.add(REL_FAILURE);
|
||||
BASE_RELATIONSHIPS = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
private final Map<PropertyDescriptor, String> propertyInstanceValues = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile != null) {
|
||||
try {
|
||||
InvocationContextProperties contextProperties = new InvocationContextProperties(context, flowFile);
|
||||
flowFile = this.doTransform(context, session, flowFile, contextProperties);
|
||||
session.transfer(flowFile, REL_SUCCESS);
|
||||
} catch (Exception e) {
|
||||
this.getLogger().error("Failed FlowFile processing, routing to failure. Issue: " + e.getMessage(), e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
} else {
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
List<PropertyDescriptor> propertyDescriptors = this.getSupportedPropertyDescriptors();
|
||||
for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
|
||||
if (!propertyDescriptor.isExpressionLanguageSupported()){
|
||||
this.propertyInstanceValues.put(propertyDescriptor, context.getProperty(propertyDescriptor).getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
protected FlowFile doTransform(ProcessContext context, ProcessSession session, FlowFile flowFile, InvocationContextProperties contextProperties) {
|
||||
AtomicReference<Map<String, String>> attributeRef = new AtomicReference<Map<String, String>>();
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
attributeRef.set(transform(in, null, contextProperties));
|
||||
}
|
||||
});
|
||||
if (attributeRef.get() != null) {
|
||||
flowFile = session.putAllAttributes(flowFile, attributeRef.get());
|
||||
}
|
||||
return flowFile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return BASE_RELATIONSHIPS;
|
||||
}
|
||||
|
||||
/**
|
||||
* This operation is designed to allow sub-classes to provide
|
||||
* implementations that read content of the provided {@link InputStream} and
|
||||
* write content (same or different) it into the provided
|
||||
* {@link OutputStream}. Both {@link InputStream} and {@link OutputStream}
|
||||
* represent the content of the in/out {@link FlowFile}. The
|
||||
* {@link OutputStream} can be null if no output needs to be written.
|
||||
* <p>
|
||||
* The returned {@link Map} represents attributes that will be added to the
|
||||
* outgoing FlowFile.
|
||||
*
|
||||
*
|
||||
* @param in
|
||||
* {@link InputStream} representing data to be transformed
|
||||
* @param out
|
||||
* {@link OutputStream} representing target stream to wrote
|
||||
* transformed data. Can be null if no output needs to be
|
||||
* written.
|
||||
* @param contextProperties
|
||||
* instance of {@link InvocationContextProperties}
|
||||
*/
|
||||
protected abstract Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties);
|
||||
|
||||
/**
|
||||
* Properties object that gathers the value of the
|
||||
* {@link PropertyDescriptor} within the context of
|
||||
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
|
||||
* invocation. It maintains the knowledge of instance properties vs.
|
||||
* invocation properties that the values of which are set by evaluating
|
||||
* expression against the incoming {@link FlowFile}.
|
||||
*/
|
||||
public class InvocationContextProperties {
|
||||
private final Map<PropertyDescriptor, String> propertyInvocationValues = new HashMap<>();
|
||||
|
||||
InvocationContextProperties(ProcessContext context, FlowFile flowFile) {
|
||||
List<PropertyDescriptor> propertyDescriptors = BaseTransformer.this.getSupportedPropertyDescriptors();
|
||||
for (PropertyDescriptor propertyDescriptor : propertyDescriptors) {
|
||||
if (propertyDescriptor.isExpressionLanguageSupported()) {
|
||||
PropertyValue value = context.getProperty(propertyDescriptor)
|
||||
.evaluateAttributeExpressions(flowFile);
|
||||
this.propertyInvocationValues.put(propertyDescriptor, value.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the property within the context of
|
||||
* {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory)}
|
||||
* invocation.
|
||||
*/
|
||||
public String getPropertyValue(PropertyDescriptor propertyDescriptor, boolean notNull) {
|
||||
String propertyValue = propertyInstanceValues.containsKey(propertyDescriptor)
|
||||
? propertyInstanceValues.get(propertyDescriptor)
|
||||
: propertyInvocationValues.get(propertyDescriptor);
|
||||
if (notNull && propertyValue == null) {
|
||||
throw new IllegalArgumentException("Property '" + propertyDescriptor + "' evaluatd to null");
|
||||
}
|
||||
return propertyValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Instance: " + propertyInstanceValues + "; Invocation: " + propertyInvocationValues;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,299 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.StringWriter;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.DecimalFormat;
|
||||
import java.text.NumberFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringEscapeUtils;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.codehaus.jackson.JsonNode;
|
||||
import org.codehaus.jackson.node.BooleanNode;
|
||||
import org.codehaus.jackson.node.DoubleNode;
|
||||
import org.codehaus.jackson.node.IntNode;
|
||||
import org.codehaus.jackson.node.LongNode;
|
||||
import org.codehaus.jackson.node.TextNode;
|
||||
|
||||
/**
|
||||
* Various CSV related utility operations relevant to transforming contents of
|
||||
* the {@link FlowFile} between CSV and AVRO formats.
|
||||
*/
|
||||
class CSVUtils {
|
||||
/**
|
||||
* Provides a {@link Validator} to ensure that provided value is a valid
|
||||
* character.
|
||||
*/
|
||||
public static final Validator CHAR_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input, ValidationContext context) {
|
||||
// Allows special, escaped characters as input, which is then un-escaped and converted to a single character.
|
||||
// Examples for special characters: \t (or \u0009), \f.
|
||||
if (input.length() > 1) {
|
||||
input = StringEscapeUtils.unescapeJava(input);
|
||||
}
|
||||
return new ValidationResult.Builder().subject(subject).input(input)
|
||||
.explanation("Only non-null single characters are supported")
|
||||
.valid(input.length() == 1 && input.charAt(0) != 0).build();
|
||||
}
|
||||
};
|
||||
|
||||
public static GenericRecord read(InputStream record, char delimiter, Schema schema, char quoteChar) {
|
||||
Record avroRecord = new GenericData.Record(schema);
|
||||
String[] parsedRecord = parseFields(convertInputStreamToString(record), delimiter, quoteChar);
|
||||
List<Field> fields = schema.getFields();
|
||||
if (parsedRecord.length != fields.size()) {
|
||||
throw new IllegalStateException("Incompatible schema. Parsed fields count does not match the count of fields from schema. "
|
||||
+ "Schema: " + schema.toString(true) + "\n Record: " + record);
|
||||
}
|
||||
|
||||
for (int i = 0; i < fields.size(); i++) {
|
||||
Field field = fields.get(i);
|
||||
Type type = field.schema().getType();
|
||||
updateRecord(field, type, parsedRecord[i], avroRecord);
|
||||
}
|
||||
return avroRecord;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses provided record into fields using provided delimiter. The
|
||||
* 'quoteChar' is used to ensure that if a delimiter char is in quotes it
|
||||
* will not be parsed into a separate filed.
|
||||
*/
|
||||
public static String[] parseFields(String record, char delimiter, char quoteChar) {
|
||||
List<String> result = new ArrayList<String>();
|
||||
int start = 0;
|
||||
boolean inQuotes = false;
|
||||
for (int i = 0; i < record.length(); i++) {
|
||||
if (record.charAt(i) == quoteChar) {
|
||||
inQuotes = !inQuotes;
|
||||
}
|
||||
boolean atLastChar = (i == record.length() - 1);
|
||||
if (atLastChar) {
|
||||
if (record.charAt(i) == delimiter) {
|
||||
//missing last column value, add NULL
|
||||
result.add(record.substring(start,i));
|
||||
result.add(null);
|
||||
} else {
|
||||
result.add(record.substring(start));
|
||||
}
|
||||
} else if (record.charAt(i) == delimiter && !inQuotes) {
|
||||
if (start == i) {
|
||||
//There is no value, so add NULL to indicated the absence of a value for this field.
|
||||
result.add(null);
|
||||
} else {
|
||||
result.add(record.substring(start, i));
|
||||
}
|
||||
start = i + 1;
|
||||
}
|
||||
}
|
||||
return result.toArray(new String[]{});
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes {@link GenericRecord} as CSV (delimited) record to the
|
||||
* {@link OutputStream} using provided delimiter.
|
||||
*/
|
||||
public static void write(GenericRecord record, char delimiter, OutputStream out) {
|
||||
List<Field> fields = record.getSchema().getFields();
|
||||
|
||||
String delimiterToUse = "";
|
||||
try {
|
||||
for (Field field : fields) {
|
||||
out.write(delimiterToUse.getBytes(StandardCharsets.UTF_8));
|
||||
Object fieldValue = record.get(field.name());
|
||||
if (null == fieldValue) {
|
||||
out.write(new byte[0]);
|
||||
} else {
|
||||
if (Type.BYTES == field.schema().getType()) {
|
||||
// need to create it from the ByteBuffer it is serialized as.
|
||||
// need to ensure the type is one of the logical ones we support and if so convert it.
|
||||
if(!"decimal".contentEquals(field.getProp("logicalType"))){
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" +
|
||||
field.getProp("logicalType") + "' that is currently not supported.");
|
||||
}
|
||||
|
||||
JsonNode rawPrecision = field.getJsonProp("precision");
|
||||
if(null == rawPrecision){
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property");
|
||||
}
|
||||
int precision = rawPrecision.asInt();
|
||||
JsonNode rawScale = field.getJsonProp("scale");
|
||||
int scale = null == rawScale ? 0 : rawScale.asInt();
|
||||
|
||||
// write out the decimal with the precision and scale.
|
||||
NumberFormat numberFormat = DecimalFormat.getInstance();
|
||||
numberFormat.setGroupingUsed(false);
|
||||
normalizeNumberFormat(numberFormat, scale, precision);
|
||||
final String rawValue = new String(((ByteBuffer)fieldValue).array());
|
||||
out.write(numberFormat.format(new BigDecimal(rawValue)).getBytes(StandardCharsets.UTF_8));
|
||||
} else {
|
||||
out.write(fieldValue.toString().getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
if (delimiterToUse.length() == 0) {
|
||||
delimiterToUse = String.valueOf(delimiter);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException("Failed to parse AVRO Record", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* According to the 1.7.7 spec If a logical type is invalid, for example a
|
||||
* decimal with scale greater than its precision,then implementations should
|
||||
* ignore the logical type and use the underlying Avro type.
|
||||
*/
|
||||
private static void normalizeNumberFormat(NumberFormat numberFormat, int scale, int precision) {
|
||||
if (scale < precision) {
|
||||
// write out with the specified precision and scale.
|
||||
numberFormat.setMaximumIntegerDigits(precision);
|
||||
numberFormat.setMaximumFractionDigits(scale);
|
||||
numberFormat.setMinimumFractionDigits(scale);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static String convertInputStreamToString(InputStream record) {
|
||||
StringWriter writer = new StringWriter();
|
||||
try {
|
||||
IOUtils.copy(record, writer, StandardCharsets.UTF_8);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to read InputStream into String", e);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static ByteBuffer encodeLogicalType(final Field field, final String fieldValue) {
|
||||
String logicalType = field.getProp("logicalType");
|
||||
if (!"decimal".contentEquals(logicalType)) {
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' has a logical type of '" + logicalType
|
||||
+ "' that is currently not supported.");
|
||||
}
|
||||
|
||||
JsonNode rawPrecision = field.getJsonProp("precision");
|
||||
if (null == rawPrecision) {
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' is missing the required precision property");
|
||||
}
|
||||
int precision = rawPrecision.asInt();
|
||||
JsonNode rawScale = field.getJsonProp("scale");
|
||||
int scale = null == rawScale ? 0 : rawScale.asInt();
|
||||
|
||||
NumberFormat numberFormat = DecimalFormat.getInstance();
|
||||
numberFormat.setGroupingUsed(false);
|
||||
normalizeNumberFormat(numberFormat, scale, precision);
|
||||
BigDecimal decimal = null == fieldValue ? new BigDecimal(retrieveDefaultFieldValue(field).asText()) : new BigDecimal(fieldValue);
|
||||
return ByteBuffer.wrap(numberFormat.format(decimal).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static JsonNode retrieveDefaultFieldValue(Field field) {
|
||||
JsonNode jsonNode = field.defaultValue();
|
||||
if (null == jsonNode) {
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' is NULL and there is no default value supplied in the Avro Schema");
|
||||
}
|
||||
return jsonNode;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
private static void updateRecord(Field field, Type type, String providedValue, Record avroRecord) {
|
||||
if (Type.NULL != type) {
|
||||
Object value;
|
||||
if (Type.INT == type) {
|
||||
value = null == providedValue ? possiblyGetDefaultValue(field, IntNode.class).getIntValue()
|
||||
: Integer.parseInt(providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.BOOLEAN == type) {
|
||||
value = null == providedValue
|
||||
? possiblyGetDefaultValue(field, BooleanNode.class).getBooleanValue()
|
||||
: Boolean.parseBoolean(providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.DOUBLE == type) {
|
||||
value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue()
|
||||
: Double.parseDouble(providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.FLOAT == type) {
|
||||
value = null == providedValue ? possiblyGetDefaultValue(field, DoubleNode.class).getDoubleValue()
|
||||
: Float.parseFloat(providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.LONG == type) {
|
||||
value = null == providedValue ? possiblyGetDefaultValue(field, LongNode.class).getLongValue()
|
||||
: Long.parseLong(providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.STRING == type) {
|
||||
value = null == providedValue ? possiblyGetDefaultValue(field, TextNode.class).getTextValue()
|
||||
: providedValue;
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.BYTES == type) {
|
||||
value = encodeLogicalType(field, providedValue);
|
||||
avroRecord.put(field.name(), value);
|
||||
} else if (Type.UNION == type) {
|
||||
field.schema().getTypes()
|
||||
.forEach(schema -> updateRecord(field, schema.getType(), providedValue, avroRecord));
|
||||
} else if (Type.ARRAY == type || Type.ENUM == type || Type.FIXED == type || Type.MAP == type
|
||||
|| Type.NULL == type || Type.RECORD == type) {
|
||||
throw new IllegalArgumentException("The field type '" + type + "' is not supported at the moment");
|
||||
} else {
|
||||
avroRecord.put(field.name(), providedValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if there is a default value to use, if not will throw
|
||||
* {@link IllegalArgumentException}
|
||||
*/
|
||||
private static <T extends JsonNode> JsonNode possiblyGetDefaultValue(Field field, Class<T> expectedDefaultType) {
|
||||
JsonNode jsonNode = retrieveDefaultFieldValue(field);
|
||||
if (field.schema().getType() != Type.UNION && !expectedDefaultType.isAssignableFrom(jsonNode.getClass())) {
|
||||
// since we do not support schema evolution here we need to throw an
|
||||
// exception here as the data is in error.
|
||||
throw new IllegalArgumentException("The field '" + field.name() + "' has a default value that "
|
||||
+ "does not match the field type. Field Type is: '" + expectedDefaultType.getName() + "' and the "
|
||||
+ "default value type is: '" + field.defaultValue().toString());
|
||||
}
|
||||
return jsonNode;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
@Tags({ "registry", "schema", "avro", "extract", "evaluate" })
|
||||
@CapabilityDescription("Extracts Avro field and assigns it to the FlowFile attribute")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
@DynamicProperty(name = "Avro field name", value = "FlowFile attribute name to set the extracted field",
|
||||
description = "The value of the Avro field specified by 'Avro field name' will be extracted and set as "
|
||||
+ "FlowFile attribute under name specified by the value of this property.")
|
||||
public final class ExtractAvroFields extends AbstractTransformer {
|
||||
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.addAll(BASE_DESCRIPTORS);
|
||||
descriptors.add(SCHEMA_TYPE);
|
||||
DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
private volatile Map<String, String> dynamicProperties;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
super.onScheduled(context);
|
||||
this.dynamicProperties = context.getProperties().entrySet().stream()
|
||||
.filter(p -> p.getKey().isDynamic())
|
||||
.collect(Collectors.toMap(p -> p.getKey().getName(), p -> p.getValue()));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.required(false)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = AvroUtils.read(in, schema);
|
||||
Map<String, String> attributes = this.dynamicProperties.entrySet().stream().collect(
|
||||
Collectors.toMap(dProp -> dProp.getValue(), dProp -> String.valueOf(avroRecord.get(dProp.getKey()))));
|
||||
return Collections.unmodifiableMap(attributes);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.Decoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
import org.apache.avro.io.JsonEncoder;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
|
||||
/**
|
||||
* Various Json related utility operations relevant to transforming contents of
|
||||
* the {@link FlowFile} between JSON and AVRO formats.
|
||||
*/
|
||||
class JsonUtils {
|
||||
|
||||
/**
|
||||
* Writes provided {@link GenericRecord} into the provided
|
||||
* {@link OutputStream} as JSON.
|
||||
*/
|
||||
public static void write(GenericRecord record, OutputStream out) {
|
||||
try {
|
||||
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
|
||||
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), out);
|
||||
writer.write(record, encoder);
|
||||
encoder.flush();
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to read GenericRecord", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads provided {@link InputStream} as ISON into Avro
|
||||
* {@link GenericRecord} applying provided {@link Schema} returning the
|
||||
* resulting GenericRecord.
|
||||
*/
|
||||
public static GenericRecord read(InputStream jsonIs, Schema schema) {
|
||||
DataInputStream din = new DataInputStream(jsonIs);
|
||||
try {
|
||||
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);
|
||||
DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
|
||||
return reader.read(null, decoder);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to parse incoming Json input stream into Avro GenericRecord. "
|
||||
+ "Possible reason: the value may not be a valid JSON or incompatible schema is provided. Schema was '"
|
||||
+ schema.toString(true) + "'.", e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schemaregistry.processors.BaseTransformer.InvocationContextProperties;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
/**
|
||||
* Strategy that encapsulates common properties and functionalities used by all
|
||||
* processors that integrate with Schema Registry.
|
||||
*/
|
||||
interface RegistryCommon {
|
||||
|
||||
static final String SCHEMA_ATTRIBUTE_NAME = "schema.text";
|
||||
|
||||
static final PropertyDescriptor REGISTRY_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("schema-registry-service")
|
||||
.displayName("Schema Registry Service")
|
||||
.description("The Schema Registry Service for serializing/deserializing messages as well as schema retrieval.")
|
||||
.required(true)
|
||||
.identifiesControllerService(SchemaRegistry.class)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
|
||||
.name("schema-name")
|
||||
.displayName("Schema Name")
|
||||
.description("The name of schema.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor SCHEMA_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("schema-type")
|
||||
.displayName("Schema Type")
|
||||
.description("The type of schema (avro is the the only current supported schema).")
|
||||
.required(true)
|
||||
.allowableValues("avro")
|
||||
.defaultValue("avro")
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder()
|
||||
.name("csv-delimiter")
|
||||
.displayName("CSV delimiter")
|
||||
.description("Delimiter character for CSV records")
|
||||
.addValidator(CSVUtils.CHAR_VALIDATOR)
|
||||
.defaultValue(",")
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder()
|
||||
.name("csv-quote-character")
|
||||
.displayName("CSV quote character")
|
||||
.description("Quote character for CSV values")
|
||||
.addValidator(CSVUtils.CHAR_VALIDATOR)
|
||||
.defaultValue("\"")
|
||||
.build();
|
||||
/**
|
||||
* Utility operation to retrieve and parse {@link Schema} from Schema
|
||||
* Registry using provided {@link SchemaRegistry};
|
||||
*/
|
||||
static Schema retrieveSchema(SchemaRegistry schemaRegistry, InvocationContextProperties contextProperties) {
|
||||
String schemaName = contextProperties.getPropertyValue(SCHEMA_NAME, true);
|
||||
String schemaText = schemaRegistry.retrieveSchemaText(schemaName);
|
||||
return new Schema.Parser().parse(schemaText);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
@Tags({ "registry", "schema", "avro", "csv", "transform" })
|
||||
@CapabilityDescription("Transforms AVRO content of the Flow File to CSV using the schema provided by the Schema Registry Service.")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
public final class TransformAvroToCSV extends AbstractCSVTransformer {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
byte[] buff = null;
|
||||
try {
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
IOUtils.copy(in, bos);
|
||||
buff = bos.toByteArray();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
ByteArrayInputStream is = new ByteArrayInputStream(buff);
|
||||
GenericRecord avroRecord = AvroUtils.read(is, schema);
|
||||
CSVUtils.write(avroRecord, this.delimiter, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
|
||||
@Tags({ "registry", "schema", "avro", "json", "transform" })
|
||||
@CapabilityDescription("Transforms AVRO content of the Flow File to JSON using the schema provided by the Schema Registry Service.")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
public final class TransformAvroToJson extends AbstractContentTransformer {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = AvroUtils.read(in, schema);
|
||||
JsonUtils.write(avroRecord, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
||||
@Tags({ "csv", "avro", "transform", "registry", "schema" })
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Transforms CSV content of the Flow File to Avro using the schema provided by the Schema Registry Service.")
|
||||
public final class TransformCSVToAvro extends AbstractCSVTransformer {
|
||||
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.addAll(BASE_CSV_DESCRIPTORS);
|
||||
descriptors.add(QUOTE);
|
||||
DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
private volatile char quoteChar;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
super.onScheduled(context);
|
||||
this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar);
|
||||
AvroUtils.write(avroRecord, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
|
||||
@Tags({ "csv", "json", "transform", "registry", "schema" })
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@CapabilityDescription("Transforms CSV content of the Flow File to JSON using the schema provided by the Schema Registry Service.")
|
||||
public final class TransformCSVToJson extends AbstractCSVTransformer {
|
||||
|
||||
private static final List<PropertyDescriptor> DESCRIPTORS;
|
||||
|
||||
static {
|
||||
List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.addAll(BASE_CSV_DESCRIPTORS);
|
||||
descriptors.add(QUOTE);
|
||||
DESCRIPTORS = Collections.unmodifiableList(descriptors);
|
||||
}
|
||||
|
||||
private volatile char quoteChar;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
@OnScheduled
|
||||
public void onScheduled(ProcessContext context) {
|
||||
super.onScheduled(context);
|
||||
this.quoteChar = context.getProperty(QUOTE).getValue().charAt(0);
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return DESCRIPTORS;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = CSVUtils.read(in, this.delimiter, schema, this.quoteChar);
|
||||
JsonUtils.write(avroRecord, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
@Tags({ "registry", "schema", "avro", "json", "transform" })
|
||||
@CapabilityDescription("Transforms JSON content of the Flow File to Avro using the schema provided by the Schema Registry Service.")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
public final class TransformJsonToAvro extends AbstractContentTransformer {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = JsonUtils.read(in, schema);
|
||||
AvroUtils.write(avroRecord, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "binary/avro");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
|
||||
@Tags({ "registry", "schema", "csv", "json", "transform" })
|
||||
@CapabilityDescription("Transforms JSON content of the Flow File to CSV using the schema provided by the Schema Registry Service.")
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||
public final class TransformJsonToCSV extends AbstractCSVTransformer {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected Map<String, String> transform(InputStream in, OutputStream out, InvocationContextProperties contextProperties, Schema schema) {
|
||||
GenericRecord avroRecord = JsonUtils.read(in, schema);
|
||||
CSVUtils.write(avroRecord, this.delimiter, out);
|
||||
return Collections.singletonMap(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.schemaregistry.processors.TransformJsonToAvro
|
||||
org.apache.nifi.schemaregistry.processors.TransformAvroToJson
|
||||
org.apache.nifi.schemaregistry.processors.TransformCSVToAvro
|
||||
org.apache.nifi.schemaregistry.processors.TransformCSVToJson
|
||||
org.apache.nifi.schemaregistry.processors.TransformAvroToCSV
|
||||
org.apache.nifi.schemaregistry.processors.TransformJsonToCSV
|
||||
org.apache.nifi.schemaregistry.processors.ExtractAvroFields
|
|
@ -0,0 +1,188 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.processors;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.Files;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import junitparams.JUnitParamsRunner;
|
||||
import junitparams.Parameters;
|
||||
|
||||
@RunWith(JUnitParamsRunner.class)
|
||||
public class TransformersTest {
|
||||
|
||||
private final ClassLoader classLoader = getClass().getClassLoader();
|
||||
|
||||
@Test
|
||||
public void validateCSVtoAvroPair() throws Exception {
|
||||
String data = "John Dow|13|blue";
|
||||
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
|
||||
+ "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
|
||||
+ "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
|
||||
+ "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
|
||||
|
||||
Schema schema = new Schema.Parser().parse(fooSchemaText);
|
||||
|
||||
// CSV -> AVRO -> CSV
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
|
||||
GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
AvroUtils.write(record, out);
|
||||
byte[] avro = out.toByteArray();
|
||||
|
||||
in = new ByteArrayInputStream(avro);
|
||||
record = AvroUtils.read(in, schema);
|
||||
out = new ByteArrayOutputStream();
|
||||
CSVUtils.write(record, '|', out);
|
||||
byte[] csv = out.toByteArray();
|
||||
assertEquals(data, new String(csv, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateCSVtoJsonPair() throws Exception {
|
||||
String data = "John Dow|13|blue";
|
||||
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
|
||||
+ "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
|
||||
+ "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
|
||||
+ "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
|
||||
|
||||
Schema schema = new Schema.Parser().parse(fooSchemaText);
|
||||
|
||||
// CSV -> JSON -> CSV
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
|
||||
GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
JsonUtils.write(record, out);
|
||||
byte[] json = out.toByteArray();
|
||||
|
||||
assertEquals("{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}", new String(json, StandardCharsets.UTF_8));
|
||||
|
||||
in = new ByteArrayInputStream(json);
|
||||
record = JsonUtils.read(in, schema);
|
||||
out = new ByteArrayOutputStream();
|
||||
CSVUtils.write(record, '|', out);
|
||||
byte[] csv = out.toByteArray();
|
||||
assertEquals(data, new String(csv, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void validateJsonToAvroPair() throws Exception {
|
||||
String data = "{\"name\":\"John Dow\",\"favorite_number\":13,\"favorite_color\":\"blue\"}";
|
||||
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
|
||||
+ "\"fields\": [ " + "{\"name\": \"name\", \"type\": \"string\"}, "
|
||||
+ "{\"name\": \"favorite_number\", \"type\": \"int\"}, "
|
||||
+ "{\"name\": \"favorite_color\", \"type\": \"string\"} " + "]" + "}";
|
||||
|
||||
Schema schema = new Schema.Parser().parse(fooSchemaText);
|
||||
|
||||
// JSON -> AVRO -> JSON
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
|
||||
GenericRecord record = JsonUtils.read(in, schema);
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
AvroUtils.write(record, out);
|
||||
byte[] avro = out.toByteArray();
|
||||
|
||||
in = new ByteArrayInputStream(avro);
|
||||
record = AvroUtils.read(in, schema);
|
||||
out = new ByteArrayOutputStream();
|
||||
JsonUtils.write(record, out);
|
||||
byte[] csv = out.toByteArray();
|
||||
assertEquals(data, new String(csv, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters({"input_csv/union_null_last_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_last_field_with_default.txt",
|
||||
"input_csv/union_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_with_default.txt",
|
||||
"input_csv/union_null_middle_field_with_default.txt,input_avro/union_and_matching_defaults.txt,expected_ouput_csv/union_null_middle_field_with_default.txt",
|
||||
"input_csv/primitive_types.txt,input_avro/primitive_types_no_defaults.txt,expected_ouput_csv/primitive_types.txt",
|
||||
"input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_matching_default.txt,expected_ouput_csv/primitive_types_with_matching_default.txt",
|
||||
"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_valid_scale_with_no_default.txt,expected_ouput_csv/decimal_logicalType.txt",
|
||||
"input_csv/decimal_logicalType.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_invalid_scale.txt",
|
||||
"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_valid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_valid_scale_with_default.txt",
|
||||
"input_csv/decimal_logicalType_missing_value.txt,input_avro/decimal_logicalType_invalid_scale_with_default.txt,expected_ouput_csv/decimal_logicalType_with_default.txt"})
|
||||
public void testCSVRoundtrip(final String inputCSVFileName, final String inputAvroSchema, final String expectedOuput) throws Exception {
|
||||
|
||||
final String data = getResourceAsString(inputCSVFileName);
|
||||
final String schemaText = getResourceAsString(inputAvroSchema);
|
||||
final String result = getResourceAsString(expectedOuput);
|
||||
|
||||
Schema schema = new Schema.Parser().parse(schemaText);
|
||||
|
||||
|
||||
// CSV -> AVRO -> CSV
|
||||
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
|
||||
GenericRecord record = CSVUtils.read(in, '|', schema, '\"');
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
AvroUtils.write(record, out);
|
||||
byte[] avro = out.toByteArray();
|
||||
|
||||
in = new ByteArrayInputStream(avro);
|
||||
record = AvroUtils.read(in, schema);
|
||||
out = new ByteArrayOutputStream();
|
||||
CSVUtils.write(record, '|', out);
|
||||
byte[] csv = out.toByteArray();
|
||||
assertEquals(result, new String(csv, StandardCharsets.UTF_8));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Parameters({"input_csv/union_with_missing_value.txt,input_avro/union_and_mismatch_defaults.txt",
|
||||
"input_csv/primitive_types_with_matching_default.txt,input_avro/primitive_types_with_mismatch_default.txt"})
|
||||
public void testCSVMismatchDefaults(final String inputCSVFileName, final String inputAvroSchema) {
|
||||
|
||||
try {
|
||||
final String data = getResourceAsString(inputCSVFileName);
|
||||
|
||||
final String schemaText = getResourceAsString(inputAvroSchema);
|
||||
|
||||
Schema schema = new Schema.Parser().parse(schemaText);
|
||||
|
||||
ByteArrayInputStream in = new ByteArrayInputStream(data.getBytes());
|
||||
CSVUtils.read(in, '|', schema, '\"');
|
||||
}catch (IOException ioe){
|
||||
assertTrue(false);
|
||||
}catch(IllegalArgumentException iae){
|
||||
assertTrue(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple wrapper around getting the test resource file that is used by the above test cases
|
||||
*
|
||||
* @param fileName - the filename of the file to read
|
||||
* @return A string that contains the body of the file.
|
||||
* @throws IOException - if an error occurs reading the file.
|
||||
*/
|
||||
private String getResourceAsString(String fileName) throws IOException {
|
||||
return new String(Files.readAllBytes(FileSystems.getDefault().getPath(classLoader.getResource(fileName).getPath())));
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|11234567.890
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|11234567.89
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|0.000
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|0
|
|
@ -0,0 +1 @@
|
|||
"this is a simple string."|10|21474836470|1.7976931348623157E308|true
|
|
@ -0,0 +1 @@
|
|||
"default_string"|1234|21474836470|1.7976931348623157E308|true
|
|
@ -0,0 +1 @@
|
|||
andrew|13|
|
|
@ -0,0 +1 @@
|
|||
andrew|21474|blue
|
|
@ -0,0 +1 @@
|
|||
andrew|13|blue
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"name": "trx_table",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "transactionid",
|
||||
"type": ["string", "null"]
|
||||
}, {
|
||||
"name": "amount",
|
||||
"type": "bytes",
|
||||
"logicalType": "decimal",
|
||||
"precision": 10,
|
||||
"scale": 13,
|
||||
"default": 0.0
|
||||
}]
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"name": "trx_table",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "transactionid",
|
||||
"type": ["string", "null"]
|
||||
}, {
|
||||
"name": "amount",
|
||||
"type": "bytes",
|
||||
"logicalType": "decimal",
|
||||
"precision": 10,
|
||||
"scale": 3,
|
||||
"default": 0.0
|
||||
}]
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
{
|
||||
"name": "trx_table",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "transactionid",
|
||||
"type": ["string", "null"]
|
||||
}, {
|
||||
"name": "amount",
|
||||
"type": "bytes",
|
||||
"logicalType": "decimal",
|
||||
"precision": 10,
|
||||
"scale": 3
|
||||
}]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"type":"record",
|
||||
"name":"basic_primitive_type_check",
|
||||
"fields":[
|
||||
{"name":"string","type":"string"},
|
||||
{"name":"integer","type":"int"},
|
||||
{"name":"long","type":"long"},
|
||||
{"name":"double","type":"double"},
|
||||
{"name":"boolean","type":"boolean"}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"type":"record",
|
||||
"name":"basic_primitive_type_check",
|
||||
"fields":[
|
||||
{"name":"string","type":["null","string"],"default":null},
|
||||
{"name":"integer","type":["null","int"],"default":null},
|
||||
{"name":"long","type":["null","long"],"default":null},
|
||||
{"name":"double","type":["null","double"],"default":null},
|
||||
{"name":"boolean","type":["null","boolean"],"default":null}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"type":"record",
|
||||
"name":"basic_primitive_type_check",
|
||||
"fields":[
|
||||
{"name":"string","type":"string","default":"default_string"},
|
||||
{"name":"integer","type":"int","default":1234},
|
||||
{"name":"long","type":"long","default":21474836470},
|
||||
{"name":"double","type":"double","default":1.7976931348623157E308},
|
||||
{"name":"boolean","type":"boolean","default":true}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"type":"record",
|
||||
"name":"basic_primitive_type_check",
|
||||
"fields":[
|
||||
{"name":"string","type":"string","default":1234},
|
||||
{"name":"integer","type":"int","default":"mismatch_int"},
|
||||
{"name":"long","type":"long","default":"mismatch_long"},
|
||||
{"name":"double","type":"double","default":"mismatch_double"},
|
||||
{"name":"boolean","type":"boolean","default":"mismatch_boolean"}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"namespace": "example.avro",
|
||||
"type": "record",
|
||||
"name": "User",
|
||||
"fields": [{
|
||||
"name": "name",
|
||||
"type": "string",
|
||||
"default": "default_name"
|
||||
}, {
|
||||
"name": "favorite_number",
|
||||
"type": "int",
|
||||
"default": 21474
|
||||
}, {
|
||||
"name": "favorite_color",
|
||||
"type": ["null", "string"],
|
||||
"default": null
|
||||
}]
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
{
|
||||
"namespace": "example.avro",
|
||||
"type": "record",
|
||||
"name": "User",
|
||||
"fields": [{
|
||||
"name": "name",
|
||||
"type": "string",
|
||||
"default": "default_name"
|
||||
}, {
|
||||
"name": "favorite_number",
|
||||
"type": "int",
|
||||
"default": "mismatched_int_default"
|
||||
}, {
|
||||
"name": "favorite_color",
|
||||
"type": ["null", "string"],
|
||||
"default": null
|
||||
}]
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|11234567.890
|
|
@ -0,0 +1 @@
|
|||
"fake_transactionid"|
|
|
@ -0,0 +1 @@
|
|||
"this is a simple string."|10|21474836470|1.7976931348623157E308|true
|
|
@ -0,0 +1 @@
|
|||
"default_string"||21474836470||true
|
|
@ -0,0 +1 @@
|
|||
andrew|13|
|
|
@ -0,0 +1 @@
|
|||
andrew||blue
|
|
@ -0,0 +1 @@
|
|||
andrew|13|blue
|
|
@ -0,0 +1 @@
|
|||
andrew||blue
|
|
@ -0,0 +1,48 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Copyright 2016 Hortoworks, Inc. All rights reserved. Hortonworks, Inc.
|
||||
licenses this file to you under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License. You may
|
||||
obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
the specific language governing permissions and limitations under the License.
|
||||
See the associated NOTICE file for additional information regarding copyright
|
||||
ownership. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-bundle</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-registry-service</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.services;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
|
||||
/**
|
||||
* Represents {@link ControllerService} strategy to expose internal and/or
|
||||
* integrate with external Schema Registry
|
||||
*/
|
||||
public interface SchemaRegistry extends ControllerService, AutoCloseable {
|
||||
|
||||
public static final String SCHEMA_NAME_ATTR = "schema.name";
|
||||
|
||||
|
||||
/**
|
||||
* Retrieves and returns the textual representation of the schema based on
|
||||
* the provided name of the schema available in Schema Registry. Will throw
|
||||
* an runtime exception if schema can not be found.
|
||||
*/
|
||||
String retrieveSchemaText(String schemaName);
|
||||
|
||||
/**
|
||||
* Retrieves and returns the textual representation of the schema based on
|
||||
* the provided name of the schema available in Schema Registry and optional
|
||||
* additional attributes. Will throw an runtime exception if schema can not
|
||||
* be found.
|
||||
*/
|
||||
String retrieveSchemaText(String schemaName, Properties attributes);
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.services;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
|
||||
@Tags({ "schema", "registry", "avro", "json", "csv" })
|
||||
@CapabilityDescription("Provides a service for registering and accessing schemas. You can register schema "
|
||||
+ "as a dynamic property where 'name' represents the schema name and 'value' represents the textual "
|
||||
+ "representation of the actual schema.")
|
||||
public class SimpleKeyValueSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
|
||||
|
||||
private static final List<PropertyDescriptor> propertyDescriptors;
|
||||
|
||||
static {
|
||||
propertyDescriptors = Collections.emptyList();
|
||||
}
|
||||
|
||||
private final Map<String, String> schemaNameToSchemaMap;
|
||||
|
||||
public SimpleKeyValueSchemaRegistry() {
|
||||
this.schemaNameToSchemaMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void enable(ConfigurationContext configuratiponContext) throws InitializationException {
|
||||
this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
|
||||
.filter(propEntry -> propEntry.getKey().isDynamic())
|
||||
.collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
public String retrieveSchemaText(String schemaName) {
|
||||
if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
|
||||
throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + ".");
|
||||
} else {
|
||||
return this.schemaNameToSchemaMap.get(schemaName);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String retrieveSchemaText(String schemaName, Properties attributes) {
|
||||
throw new UnsupportedOperationException("This version of schema registry does not "
|
||||
+ "support this operation, since schemas are only identofied by name.");
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnDisabled
|
||||
public void close() throws Exception {
|
||||
this.schemaNameToSchemaMap.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true).expressionLanguageSupported(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return propertyDescriptors;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.services;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.schemaregistry.services.SimpleKeyValueSchemaRegistry;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SimpleKeyValueSchemaRegistryTest {
|
||||
|
||||
@Test
|
||||
public void validateSchemaRegistrationFromrDynamicProperties() throws Exception {
|
||||
String schemaName = "fooSchema";
|
||||
ConfigurationContext configContext = mock(ConfigurationContext.class);
|
||||
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||
PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
|
||||
.name(schemaName)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
String fooSchemaText = "{\"namespace\": \"example.avro\", " + "\"type\": \"record\", " + "\"name\": \"User\", "
|
||||
+ "\"fields\": [ " + "{\"name\": \"name\", \"type\": [\"string\", \"null\"]}, "
|
||||
+ "{\"name\": \"favorite_number\", \"type\": [\"int\", \"null\"]}, "
|
||||
+ "{\"name\": \"foo\", \"type\": [\"int\", \"null\"]}, "
|
||||
+ "{\"name\": \"favorite_color\", \"type\": [\"string\", \"null\"]} " + "]" + "}";
|
||||
PropertyDescriptor barSchema = new PropertyDescriptor.Builder()
|
||||
.name("barSchema")
|
||||
.dynamic(false)
|
||||
.build();
|
||||
properties.put(fooSchema, fooSchemaText);
|
||||
properties.put(barSchema, "");
|
||||
when(configContext.getProperties()).thenReturn(properties);
|
||||
SchemaRegistry delegate = new SimpleKeyValueSchemaRegistry();
|
||||
((SimpleKeyValueSchemaRegistry)delegate).enable(configContext);
|
||||
|
||||
String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
|
||||
assertEquals(fooSchemaText, locatedSchemaText);
|
||||
try {
|
||||
locatedSchemaText = delegate.retrieveSchemaText("barSchema");
|
||||
fail();
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
delegate.close();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-nar-bundles</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-registry-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<description>A bundle of processors that rely on external service to obtain schema.</description>
|
||||
|
||||
<properties>
|
||||
<commons-lang3.version>3.0</commons-lang3.version>
|
||||
</properties>
|
||||
|
||||
<modules>
|
||||
<module>nifi-registry-processors</module>
|
||||
<module>nifi-registry-service</module>
|
||||
<module>nifi-registry-nar</module>
|
||||
</modules>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-processors</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
</project>
|
|
@ -74,6 +74,7 @@
|
|||
<module>nifi-websocket-bundle</module>
|
||||
<module>nifi-tcp-bundle</module>
|
||||
<module>nifi-gcp-bundle</module>
|
||||
<module>nifi-registry-bundle</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
|
6
pom.xml
6
pom.xml
|
@ -1144,6 +1144,12 @@ language governing permissions and limitations under the License. -->
|
|||
<version>1.2.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-registry-nar</artifactId>
|
||||
<version>1.2.0-SNAPSHOT</version>
|
||||
<type>nar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mqtt-nar</artifactId>
|
||||
|
|
Loading…
Reference in New Issue