From 8b39a65d5d645cb1a66e0ac42442f9e28d14b74e Mon Sep 17 00:00:00 2001
From: Mark Bathori
Date: Tue, 16 Jan 2024 11:44:02 +0100
Subject: [PATCH] NIFI-12614: Create record reader service for Protobuf
messages
This closes #8250.
Signed-off-by: Tamas Palfy
---
nifi-assembly/pom.xml | 6 +
.../nifi-protobuf-services-nar/pom.xml | 42 ++
.../src/main/resources/META-INF/LICENSE | 210 ++++++++++
.../src/main/resources/META-INF/NOTICE | 92 +++++
.../nifi-protobuf-services/pom.xml | 86 ++++
.../nifi/services/protobuf/FieldType.java | 57 +++
.../services/protobuf/ProtobufReader.java | 177 +++++++++
.../protobuf/ProtobufRecordReader.java | 65 +++
.../protobuf/converter/ProtoField.java | 53 +++
.../converter/ProtobufDataConverter.java | 371 ++++++++++++++++++
.../protobuf/schema/ProtoSchemaParser.java | 159 ++++++++
.../protobuf/schema/ProtoSchemaStrategy.java | 49 +++
.../validation/ProtoValidationResource.java | 38 ++
...g.apache.nifi.controller.ControllerService | 16 +
.../additionalDetails.html | 189 +++++++++
.../nifi/services/protobuf/ProtoTestUtil.java | 144 +++++++
.../protobuf/TestProtobufRecordReader.java | 145 +++++++
.../converter/TestProtobufDataConverter.java | 106 +++++
.../schema/TestProtoSchemaParser.java | 84 ++++
.../test/resources/google/protobuf/any.desc | 7 +
.../src/test/resources/test_proto2.desc | 11 +
.../src/test/resources/test_proto2.proto | 35 ++
.../src/test/resources/test_proto3.desc | Bin 0 -> 1022 bytes
.../src/test/resources/test_proto3.proto | 53 +++
nifi-nar-bundles/nifi-protobuf-bundle/pom.xml | 110 ++++++
nifi-nar-bundles/pom.xml | 1 +
26 files changed, 2306 insertions(+)
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaParser.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/schema/ProtoSchemaStrategy.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/validation/ProtoValidationResource.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/resources/docs/org.apache.nifi.services.protobuf.ProtobufReader/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/ProtoTestUtil.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/TestProtobufRecordReader.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/converter/TestProtobufDataConverter.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/java/org/apache/nifi/services/protobuf/schema/TestProtoSchemaParser.java
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/google/protobuf/any.desc
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.desc
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto2.proto
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.desc
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/test/resources/test_proto3.proto
create mode 100644 nifi-nar-bundles/nifi-protobuf-bundle/pom.xml
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index c2d91f7bb0..f30f8eae55 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -884,6 +884,12 @@ language governing permissions and limitations under the License. -->
2.0.0-SNAPSHOTnar
+
+ org.apache.nifi
+ nifi-protobuf-services-nar
+ 2.0.0-SNAPSHOT
+ nar
+ org.aspectj
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
new file mode 100644
index 0000000000..6eb9b9e8dc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-protobuf-bundle
+ 2.0.0-SNAPSHOT
+
+
+ nifi-protobuf-services-nar
+ nar
+
+
+
+ org.apache.nifi
+ nifi-protobuf-services
+ 2.0.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-standard-shared-nar
+ nar
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000000..84fb21549a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,210 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+APACHE NIFI SUBCOMPONENTS:
+
+The Apache NiFi project contains subcomponents with separate copyright
+notices and license terms. Your use of the source code for the these
+subcomponents is subject to the terms and conditions of the following
+licenses.
+
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000000..b57ec55464
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,92 @@
+nifi-iceberg-processors-nar
+Copyright 2014-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+ (ASLv2) Wire
+ The following NOTICE information applies:
+ Wire
+ Copyright 2013 Square, Inc.
+
+ (ASLv2) KotlinPoet
+ The following NOTICE information applies:
+ KotlinPoet
+ Copyright 2017 Square, Inc.
+
+ (ASLv2) Guava: Google Core Libraries For Java
+ The following NOTICE information applies:
+ Guava: Google Core Libraries For Java
+ Copyright (C) 2017 The Guava Authors
+
+ (ASLv2) J2ObjC Annotations
+ The following NOTICE information applies:
+ J2ObjC Annotations
+ Copyright 2022 The J2ObjC Annotations Authors
+
+ (ASLv2) FindBugs JSR305
+ The following NOTICE information applies:
+ FindBugs JSR305
+ Copyright 2017 The FindBugs JSR305 Authors
+
+ (ASLv2) Guava ListenableFuture Only
+ The following NOTICE information applies:
+ Guava ListenableFuture Only
+ Copyright (C) 2018 The Guava Authors
+
+ (ASLv2) Error Prone Annotations
+ The following NOTICE information applies:
+ Error Prone Annotations
+ Copyright 2015 The Error Prone Authors
+
+ (ASLv2) Guava InternalFutureFailureAccess and InternalFutures
+ The following NOTICE information applies:
+ Guava InternalFutureFailureAccess and InternalFutures
+ Copyright (C) 2018 The Guava Authors
+
+ (ASLv2) Okio
+ The following NOTICE information applies:
+ Okio
+ Copyright 2013 Square, Inc.
+
+ (ASLv2) JavaPoet
+ Copyright 2015 Square, Inc.
+
+ (ASLv2) Apache Commons CSV
+ The following NOTICE information applies:
+ Apache Commons CSV
+ Copyright 2005-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Avro
+ The following NOTICE information applies:
+ Apache Avro
+ Copyright 2009-2017 The Apache Software Foundation
+
+===========================================
+MIT License
+===========================================
+
+ (MIT) Checker Qual
+ The following NOTICE information applies:
+
+ Copyright (c) Copyright 2004-present by the Checker Framework developers
+ All rights reserved.
+ https://www.checkerframework.org/
+
+************************
+BSD License
+************************
+
+The following binary components are provided under the BSD License. See project link for details.
+
+ (BSD 3-Clause) Protocol Buffers
+ The following NOTICE information applies:
+ Copyright 2008 Google Inc. All rights reserved.
+ https://github.com/google/protobuf/tree/master/java
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
new file mode 100644
index 0000000000..906173d907
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/pom.xml
@@ -0,0 +1,86 @@
+
+
+
+ 4.0.0
+
+ org.apache.nifi
+ nifi-protobuf-bundle
+ 2.0.0-SNAPSHOT
+
+
+ nifi-protobuf-services
+
+
+ 3.25.1
+ 4.9.3
+
+
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+ org.apache.nifi
+ nifi-record-serialization-service-api
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+
+
+ org.apache.nifi
+ nifi-avro-record-utils
+ 2.0.0-SNAPSHOT
+
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ ${protobuf.version}
+
+
+ com.squareup.wire
+ wire-schema-jvm
+ ${wire.version}
+
+
+
+
+
+ org.apache.nifi
+ nifi-mock
+ 2.0.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-mock-record-utils
+ 2.0.0-SNAPSHOT
+ test
+
+
+
+
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
new file mode 100644
index 0000000000..71865cad09
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/FieldType.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import java.util.Arrays;
+
+/**
+ * Type for proto scalar fields.
+ */
+public enum FieldType {
+ DOUBLE("double"),
+ FLOAT("float"),
+ INT32("int32"),
+ INT64("int64"),
+ UINT32("uint32"),
+ UINT64("uint64"),
+ SINT32("sint32"),
+ SINT64("sint64"),
+ FIXED32("fixed32"),
+ FIXED64("fixed64"),
+ SFIXED32("sfixed32"),
+ SFIXED64("sfixed64"),
+ BOOL("bool"),
+ STRING("string"),
+ BYTES("bytes");
+
+ private final String type;
+
+ FieldType(String type) {
+ this.type = type;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public static FieldType findValue(final String value) {
+ return Arrays.stream(FieldType.values())
+ .filter((type -> type.getType().equalsIgnoreCase(value)))
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException(String.format("ProtoType [%s] not found", value)));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
new file mode 100644
index 0000000000..d5b97d9788
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.CoreLoaderKt;
+import com.squareup.wire.schema.Location;
+import com.squareup.wire.schema.Schema;
+import com.squareup.wire.schema.SchemaLoader;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy;
+import org.apache.nifi.services.protobuf.validation.ProtoValidationResource;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"protobuf", "record", "reader", "parser"})
+@CapabilityDescription("Parses a Protocol Buffers message from binary format.")
+public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory {
+
+ private static final String ANY_PROTO = "google/protobuf/any.proto";
+ private static final String DURATION_PROTO = "google/protobuf/duration.proto";
+ private static final String EMPTY_PROTO = "google/protobuf/empty.proto";
+ private static final String STRUCT_PROTO = "google/protobuf/struct.proto";
+ private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto";
+ private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto";
+
+ private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file",
+ "Generate from Proto file", "The record schema is generated from the provided proto file");
+
+ private volatile String messageType;
+ private volatile Schema protoSchema;
+
+ // Holder of cached proto information so validation does not reload the same proto file over and over
+ private final AtomicReference validationResourceHolder = new AtomicReference<>();
+
+ public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder()
+ .name("Proto Directory")
+ .displayName("Proto Directory")
+ .description("Directory containing Protocol Buffers message definition (.proto) file(s).")
+ .required(true)
+ .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder()
+ .name("Message Type")
+ .displayName("Message Type")
+ .description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " +
+ "The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
+ .build();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(PROTOBUF_DIRECTORY);
+ properties.add(MESSAGE_TYPE);
+ return properties;
+ }
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List problems = new ArrayList<>();
+ final String protoDirectory = validationContext.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
+ final String messageType = validationContext.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
+
+ if (protoDirectory != null && messageType != null) {
+ final Schema protoSchema = getSchemaForValidation(protoDirectory);
+ if (protoSchema.getType(messageType) == null) {
+ problems.add(new ValidationResult.Builder()
+ .subject(MESSAGE_TYPE.getDisplayName())
+ .valid(false)
+ .explanation(String.format("'%s' message type cannot be found in the provided proto files.", messageType))
+ .build());
+ }
+ }
+
+ return problems;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
+ messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
+ protoSchema = loadProtoSchema(protoDirectory);
+ }
+
+ @Override
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+ if (allowableValue.equalsIgnoreCase(GENERATE_FROM_PROTO_FILE.getValue())) {
+ return new ProtoSchemaStrategy(messageType, protoSchema);
+ }
+
+ return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+ }
+
+ @Override
+ protected List getSchemaAccessStrategyValues() {
+ final List allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
+ allowableValues.add(GENERATE_FROM_PROTO_FILE);
+ return allowableValues;
+ }
+
+ @Override
+ protected AllowableValue getDefaultSchemaAccessStrategy() {
+ return GENERATE_FROM_PROTO_FILE;
+ }
+
+ @Override
+ public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
+ return new ProtobufRecordReader(protoSchema, messageType, in, getSchema(variables, in, null));
+ }
+
+ private Schema loadProtoSchema(final String protoDirectory) {
+ final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
+ schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO),
+ Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)), Collections.emptyList());
+ return schemaLoader.loadSchema();
+ }
+
+ private Schema getSchemaForValidation(final String protoDirectory) {
+ ProtoValidationResource validationResource = validationResourceHolder.get();
+ if (validationResource == null || !protoDirectory.equals(validationResource.getProtoDirectory())) {
+ validationResource = new ProtoValidationResource(protoDirectory, loadProtoSchema(protoDirectory));
+ validationResourceHolder.set(validationResource);
+ }
+
+ return validationResource.getProtoSchema();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
new file mode 100644
index 0000000000..ab2d6fdbd7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/ProtobufRecordReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf;
+
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.protobuf.converter.ProtobufDataConverter;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ProtobufRecordReader implements RecordReader {
+
+ private final Schema protoSchema;
+ private final String messageType;
+ private final InputStream inputStream;
+ private RecordSchema recordSchema;
+ private boolean inputProcessed;
+
+ public ProtobufRecordReader(Schema protoSchema, String messageType, InputStream inputStream, RecordSchema recordSchema) {
+ this.protoSchema = protoSchema;
+ this.messageType = messageType;
+ this.inputStream = inputStream;
+ this.recordSchema = recordSchema;
+ }
+
+ @Override
+ public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException {
+ if (!inputProcessed) {
+ final ProtobufDataConverter dataConverter = new ProtobufDataConverter(protoSchema, messageType, recordSchema, coerceTypes, dropUnknownFields);
+ final Record record = dataConverter.createRecord(inputStream);
+ inputProcessed = true;
+ recordSchema = record.getSchema();
+ return record;
+ }
+
+ return null;
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return recordSchema;
+ }
+
+ @Override
+ public void close() throws IOException {
+ inputStream.close();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
new file mode 100644
index 0000000000..1481849280
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtoField.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.converter;
+
+import com.squareup.wire.schema.Field;
+import com.squareup.wire.schema.ProtoType;
+
+public class ProtoField {
+
+ private final String fieldName;
+ private final ProtoType protoType;
+ private final boolean repeatable;
+
+ public ProtoField(Field field) {
+ this(field.getName(), field.getType(), field.isRepeated());
+ }
+
+ public ProtoField(String fieldName, ProtoType protoType) {
+ this(fieldName, protoType, false);
+ }
+
+ private ProtoField(String fieldName, ProtoType protoType, boolean repeatable) {
+ this.fieldName = fieldName;
+ this.protoType = protoType;
+ this.repeatable = repeatable;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public ProtoType getProtoType() {
+ return protoType;
+ }
+
+ public boolean isRepeatable() {
+ return repeatable;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
new file mode 100644
index 0000000000..df5c491fa9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-protobuf-bundle/nifi-protobuf-services/src/main/java/org/apache/nifi/services/protobuf/converter/ProtobufDataConverter.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.protobuf.converter;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import com.squareup.wire.schema.EnumType;
+import com.squareup.wire.schema.Field;
+import com.squareup.wire.schema.MessageType;
+import com.squareup.wire.schema.OneOf;
+import com.squareup.wire.schema.ProtoType;
+import com.squareup.wire.schema.Schema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.services.protobuf.FieldType;
+import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static com.google.protobuf.CodedInputStream.decodeZigZag32;
+import static com.google.protobuf.TextFormat.unsignedToString;
+
+/**
+ * The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data.
+ */
+public class ProtobufDataConverter {
+
+ public static final String MAP_KEY_FIELD_NAME = "key";
+ public static final String MAP_VALUE_FIELD_NAME = "value";
+ public static final String ANY_TYPE_URL_FIELD_NAME = "type_url";
+ public static final String ANY_VALUE_FIELD_NAME = "value";
+ public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any";
+
+ private final Schema schema;
+ private final String rootMessageType;
+ private final RecordSchema rootRecordSchema;
+ private final boolean coerceTypes;
+ private final boolean dropUnknownFields;
+
+ private boolean containsAnyField = false;
+
+ public ProtobufDataConverter(Schema schema, String messageType, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) {
+ this.schema = schema;
+ this.rootMessageType = messageType;
+ this.rootRecordSchema = recordSchema;
+ this.coerceTypes = coerceTypes;
+ this.dropUnknownFields = dropUnknownFields;
+ }
+
+ /**
+ * Creates a record from the root message.
+ *
+ * @return created record
+ * @throws IOException failed to read input stream
+ */
+ public MapRecord createRecord(InputStream data) throws IOException {
+ final MessageType rootMessageType = (MessageType) schema.getType(this.rootMessageType);
+ Objects.requireNonNull(rootMessageType, String.format("Message with name [%s] not found in the provided proto files", this.rootMessageType));
+
+ MapRecord record = createRecord(rootMessageType, ByteString.readFrom(data), rootRecordSchema);
+ if (containsAnyField) {
+ record.regenerateSchema();
+ }
+
+ return record;
+ }
+
+ /**
+ * Creates a record for the provided message.
+ *
+ * @param messageType message to create a record from
+ * @param data proto message data
+ * @param recordSchema record schema for the created record
+ * @return created record
+ * @throws InvalidProtocolBufferException failed to parse input data
+ */
+ private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException {
+ final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data);
+
+ if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) {
+ containsAnyField = true;
+ return handleAnyField(unknownFieldSet);
+ }
+
+ final Map fieldValues = processMessageFields(messageType, unknownFieldSet);
+ return new MapRecord(recordSchema, fieldValues, false, dropUnknownFields);
+ }
+
+ /**
+ * Process declared, extension and oneOf fields in the provided message.
+ *
+ * @param messageType message with fields to be processed
+ * @param unknownFieldSet received proto data fields
+ * @return Map of processed fields
+ */
+ private Map processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException {
+ Map recordValues = new HashMap<>();
+
+ for (final Field field : messageType.getDeclaredFields()) {
+ collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
+ }
+
+ for (final Field field : messageType.getExtensionFields()) {
+ collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
+ }
+
+ for (final OneOf oneOf : messageType.getOneOfs()) {
+ for (Field field : oneOf.getFields()) {
+ collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
+ }
+ }
+ return recordValues;
+ }
+
+ /**
+ * Checks the field value's presence and sets it into the result Map.
+ *
+ * @param fieldNameToConvertedValue Map of converter values
+ * @param protoField proto field's properties
+ * @param unknownField field's value
+ */
+ private void collectFieldValue(Map fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException {
+ final Optional
+
+
+ Composite Value Types
+
+
+
+
+
Proto Type
Proto Wire Type
NiFi Data Type
+
message
length delimited
record
+
enum
varint
enum
+
map
length delimited
map
+
oneof
-
choice
+
+
+
+
Schemas and Type Coercion
+
+
+ When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
+ configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
+ the schema, that field will be stored in the Record's value list on its original type. If the field is found in the schema,
+ the data type of the received data is compared against the data type specified in the schema. If the types match, the value
+ of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service
+ will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type,
+ an Exception will be thrown.
+
+
+
+ The following rules apply when attempting to coerce a field value from one data type to another:
+
+
+
+
Any data type can be coerced into a String type.
+
Any numeric data type (Int, Long, Float, Double) can be coerced into any other numeric data type.
+
Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
+ milliseconds since epoch (Midnight GMT, January 1, 1970).
+
A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
+ or "Timestamp Format."
+
A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
+ 8 can be coerced into any numeric type. However, the String value 8.2 can be coerced into a Double or Float
+ type but not an Integer.
+
A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.
+
A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
+ and the rest of the characters are ignored.
+
+
+
+ If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
+ will be thrown.
+
+
+
Schema Access Strategy
+
+
+ Beside the common Schema Access strategies like getting the schema from property value or accessing it from Schema Registry,
+ the ProtobufReader Controller Service offers another access strategy option called "Generate from Proto file". When using this strategy,
+ the Reader will generate the Record Schema from the provided '.proto' file and Message type. This is a recommended strategy when the user
+ doesn't want to manually create the schema or when no type coercion is needed.
+
+
+
+
Protobuf Any Field Type
+
+
+ Protocol Buffers offers further Message types called Well-Known Types. These are additionally provided messages that defines
+ complex structured types and wrappers for scalar types. The Any type is one of these Well-Known Types which is used to store an arbitrary
+ serialized Message along with an URL that describes the type of the serialized Message. Since the Message type and the embedded Message will be
+ available only when the Any Message is already populated with data, the ProtobufReader needs to do this Message processing at data conversion time.
+ The Reader is capable to generate schema for the embedded Message in the Any field and replace it in the result Record schema.
+
+
+
Example
+
+
+ There is a Message called 'TestMessage' which has only one field that is an Any typed field. There is another Message called 'NestedMessage'
+ that we would like to add as serialized Message in the value of 'anyField'.
+