NIFI-12614: Create record reader service for Protobuf messages

This closes #8250.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Mark Bathori 2024-01-16 11:44:02 +01:00 committed by tpalfy
parent 90d0f6317a
commit 8b39a65d5d
26 changed files with 2306 additions and 0 deletions

View File

@ -884,6 +884,12 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-protobuf-services-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<!-- AspectJ library needed by the Java Agent used for native library loading (see bootstrap.conf) -->
<dependency>
<groupId>org.aspectj</groupId>

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-protobuf-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-protobuf-services-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-protobuf-services</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-shared-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,210 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
APACHE NIFI SUBCOMPONENTS:
The Apache NiFi project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

View File

@ -0,0 +1,92 @@
nifi-iceberg-processors-nar
Copyright 2014-2023 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
(ASLv2) Wire
The following NOTICE information applies:
Wire
Copyright 2013 Square, Inc.
(ASLv2) KotlinPoet
The following NOTICE information applies:
KotlinPoet
Copyright 2017 Square, Inc.
(ASLv2) Guava: Google Core Libraries For Java
The following NOTICE information applies:
Guava: Google Core Libraries For Java
Copyright (C) 2017 The Guava Authors
(ASLv2) J2ObjC Annotations
The following NOTICE information applies:
J2ObjC Annotations
Copyright 2022 The J2ObjC Annotations Authors
(ASLv2) FindBugs JSR305
The following NOTICE information applies:
FindBugs JSR305
Copyright 2017 The FindBugs JSR305 Authors
(ASLv2) Guava ListenableFuture Only
The following NOTICE information applies:
Guava ListenableFuture Only
Copyright (C) 2018 The Guava Authors
(ASLv2) Error Prone Annotations
The following NOTICE information applies:
Error Prone Annotations
Copyright 2015 The Error Prone Authors
(ASLv2) Guava InternalFutureFailureAccess and InternalFutures
The following NOTICE information applies:
Guava InternalFutureFailureAccess and InternalFutures
Copyright (C) 2018 The Guava Authors
(ASLv2) Okio
The following NOTICE information applies:
Okio
Copyright 2013 Square, Inc.
(ASLv2) JavaPoet
Copyright 2015 Square, Inc.
(ASLv2) Apache Commons CSV
The following NOTICE information applies:
Apache Commons CSV
Copyright 2005-2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
(ASLv2) Apache Avro
The following NOTICE information applies:
Apache Avro
Copyright 2009-2017 The Apache Software Foundation
===========================================
MIT License
===========================================
(MIT) Checker Qual
The following NOTICE information applies:
Copyright (c) Copyright 2004-present by the Checker Framework developers
All rights reserved.
https://www.checkerframework.org/
************************
BSD License
************************
The following binary components are provided under the BSD License. See project link for details.
(BSD 3-Clause) Protocol Buffers
The following NOTICE information applies:
Copyright 2008 Google Inc. All rights reserved.
https://github.com/google/protobuf/tree/master/java

View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-protobuf-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-protobuf-services</artifactId>
<properties>
<protobuf.version>3.25.1</protobuf.version>
<wire.version>4.9.3</wire.version>
</properties>
<dependencies>
<!-- Internal dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<!-- External dependencies -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.squareup.wire</groupId>
<artifactId>wire-schema-jvm</artifactId>
<version>${wire.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import java.util.Arrays;
/**
* Type for proto scalar fields.
*/
public enum FieldType {
DOUBLE("double"),
FLOAT("float"),
INT32("int32"),
INT64("int64"),
UINT32("uint32"),
UINT64("uint64"),
SINT32("sint32"),
SINT64("sint64"),
FIXED32("fixed32"),
FIXED64("fixed64"),
SFIXED32("sfixed32"),
SFIXED64("sfixed64"),
BOOL("bool"),
STRING("string"),
BYTES("bytes");
private final String type;
FieldType(String type) {
this.type = type;
}
public String getType() {
return type;
}
public static FieldType findValue(final String value) {
return Arrays.stream(FieldType.values())
.filter((type -> type.getType().equalsIgnoreCase(value)))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("ProtoType [%s] not found", value)));
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import com.squareup.wire.schema.CoreLoaderKt;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.Schema;
import com.squareup.wire.schema.SchemaLoader;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.services.protobuf.schema.ProtoSchemaStrategy;
import org.apache.nifi.services.protobuf.validation.ProtoValidationResource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"protobuf", "record", "reader", "parser"})
@CapabilityDescription("Parses a Protocol Buffers message from binary format.")
public class ProtobufReader extends SchemaRegistryService implements RecordReaderFactory {
private static final String ANY_PROTO = "google/protobuf/any.proto";
private static final String DURATION_PROTO = "google/protobuf/duration.proto";
private static final String EMPTY_PROTO = "google/protobuf/empty.proto";
private static final String STRUCT_PROTO = "google/protobuf/struct.proto";
private static final String TIMESTAMP_PROTO = "google/protobuf/timestamp.proto";
private static final String WRAPPERS_PROTO = "google/protobuf/wrappers.proto";
private static final AllowableValue GENERATE_FROM_PROTO_FILE = new AllowableValue("generate-from-proto-file",
"Generate from Proto file", "The record schema is generated from the provided proto file");
private volatile String messageType;
private volatile Schema protoSchema;
// Holder of cached proto information so validation does not reload the same proto file over and over
private final AtomicReference<ProtoValidationResource> validationResourceHolder = new AtomicReference<>();
public static final PropertyDescriptor PROTOBUF_DIRECTORY = new PropertyDescriptor.Builder()
.name("Proto Directory")
.displayName("Proto Directory")
.description("Directory containing Protocol Buffers message definition (.proto) file(s).")
.required(true)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor MESSAGE_TYPE = new PropertyDescriptor.Builder()
.name("Message Type")
.displayName("Message Type")
.description("Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). " +
"The .proto files configured in '" + PROTOBUF_DIRECTORY.getDisplayName() + "' must contain the definition of this message type.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PROTOBUF_DIRECTORY);
properties.add(MESSAGE_TYPE);
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>();
final String protoDirectory = validationContext.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
final String messageType = validationContext.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
if (protoDirectory != null && messageType != null) {
final Schema protoSchema = getSchemaForValidation(protoDirectory);
if (protoSchema.getType(messageType) == null) {
problems.add(new ValidationResult.Builder()
.subject(MESSAGE_TYPE.getDisplayName())
.valid(false)
.explanation(String.format("'%s' message type cannot be found in the provided proto files.", messageType))
.build());
}
}
return problems;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final String protoDirectory = context.getProperty(PROTOBUF_DIRECTORY).evaluateAttributeExpressions().getValue();
messageType = context.getProperty(MESSAGE_TYPE).evaluateAttributeExpressions().getValue();
protoSchema = loadProtoSchema(protoDirectory);
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
if (allowableValue.equalsIgnoreCase(GENERATE_FROM_PROTO_FILE.getValue())) {
return new ProtoSchemaStrategy(messageType, protoSchema);
}
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
allowableValues.add(GENERATE_FROM_PROTO_FILE);
return allowableValues;
}
@Override
protected AllowableValue getDefaultSchemaAccessStrategy() {
return GENERATE_FROM_PROTO_FILE;
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
return new ProtobufRecordReader(protoSchema, messageType, in, getSchema(variables, in, null));
}
private Schema loadProtoSchema(final String protoDirectory) {
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Arrays.asList(Location.get(protoDirectory),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, ANY_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, DURATION_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, EMPTY_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, STRUCT_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, TIMESTAMP_PROTO),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, WRAPPERS_PROTO)), Collections.emptyList());
return schemaLoader.loadSchema();
}
private Schema getSchemaForValidation(final String protoDirectory) {
ProtoValidationResource validationResource = validationResourceHolder.get();
if (validationResource == null || !protoDirectory.equals(validationResource.getProtoDirectory())) {
validationResource = new ProtoValidationResource(protoDirectory, loadProtoSchema(protoDirectory));
validationResourceHolder.set(validationResource);
}
return validationResource.getProtoSchema();
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.protobuf.converter.ProtobufDataConverter;
import java.io.IOException;
import java.io.InputStream;
public class ProtobufRecordReader implements RecordReader {
private final Schema protoSchema;
private final String messageType;
private final InputStream inputStream;
private RecordSchema recordSchema;
private boolean inputProcessed;
public ProtobufRecordReader(Schema protoSchema, String messageType, InputStream inputStream, RecordSchema recordSchema) {
this.protoSchema = protoSchema;
this.messageType = messageType;
this.inputStream = inputStream;
this.recordSchema = recordSchema;
}
@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException {
if (!inputProcessed) {
final ProtobufDataConverter dataConverter = new ProtobufDataConverter(protoSchema, messageType, recordSchema, coerceTypes, dropUnknownFields);
final Record record = dataConverter.createRecord(inputStream);
inputProcessed = true;
recordSchema = record.getSchema();
return record;
}
return null;
}
@Override
public RecordSchema getSchema() {
return recordSchema;
}
@Override
public void close() throws IOException {
inputStream.close();
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.converter;
import com.squareup.wire.schema.Field;
import com.squareup.wire.schema.ProtoType;
public class ProtoField {
private final String fieldName;
private final ProtoType protoType;
private final boolean repeatable;
public ProtoField(Field field) {
this(field.getName(), field.getType(), field.isRepeated());
}
public ProtoField(String fieldName, ProtoType protoType) {
this(fieldName, protoType, false);
}
private ProtoField(String fieldName, ProtoType protoType, boolean repeatable) {
this.fieldName = fieldName;
this.protoType = protoType;
this.repeatable = repeatable;
}
public String getFieldName() {
return fieldName;
}
public ProtoType getProtoType() {
return protoType;
}
public boolean isRepeatable() {
return repeatable;
}
}

View File

@ -0,0 +1,371 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.converter;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.UnknownFieldSet;
import com.squareup.wire.schema.EnumType;
import com.squareup.wire.schema.Field;
import com.squareup.wire.schema.MessageType;
import com.squareup.wire.schema.OneOf;
import com.squareup.wire.schema.ProtoType;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.services.protobuf.FieldType;
import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import static com.google.protobuf.CodedInputStream.decodeZigZag32;
import static com.google.protobuf.TextFormat.unsignedToString;
/**
* The class is responsible for creating Record by mapping the provided proto schema fields with the list of Unknown fields parsed from encoded proto data.
*/
public class ProtobufDataConverter {
public static final String MAP_KEY_FIELD_NAME = "key";
public static final String MAP_VALUE_FIELD_NAME = "value";
public static final String ANY_TYPE_URL_FIELD_NAME = "type_url";
public static final String ANY_VALUE_FIELD_NAME = "value";
public static final String ANY_MESSAGE_TYPE = "google.protobuf.Any";
private final Schema schema;
private final String rootMessageType;
private final RecordSchema rootRecordSchema;
private final boolean coerceTypes;
private final boolean dropUnknownFields;
private boolean containsAnyField = false;
public ProtobufDataConverter(Schema schema, String messageType, RecordSchema recordSchema, boolean coerceTypes, boolean dropUnknownFields) {
this.schema = schema;
this.rootMessageType = messageType;
this.rootRecordSchema = recordSchema;
this.coerceTypes = coerceTypes;
this.dropUnknownFields = dropUnknownFields;
}
/**
* Creates a record from the root message.
*
* @return created record
* @throws IOException failed to read input stream
*/
public MapRecord createRecord(InputStream data) throws IOException {
final MessageType rootMessageType = (MessageType) schema.getType(this.rootMessageType);
Objects.requireNonNull(rootMessageType, String.format("Message with name [%s] not found in the provided proto files", this.rootMessageType));
MapRecord record = createRecord(rootMessageType, ByteString.readFrom(data), rootRecordSchema);
if (containsAnyField) {
record.regenerateSchema();
}
return record;
}
/**
* Creates a record for the provided message.
*
* @param messageType message to create a record from
* @param data proto message data
* @param recordSchema record schema for the created record
* @return created record
* @throws InvalidProtocolBufferException failed to parse input data
*/
private MapRecord createRecord(MessageType messageType, ByteString data, RecordSchema recordSchema) throws InvalidProtocolBufferException {
final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(data);
if ((ANY_MESSAGE_TYPE).equals(messageType.getType().toString())) {
containsAnyField = true;
return handleAnyField(unknownFieldSet);
}
final Map<String, Object> fieldValues = processMessageFields(messageType, unknownFieldSet);
return new MapRecord(recordSchema, fieldValues, false, dropUnknownFields);
}
/**
* Process declared, extension and oneOf fields in the provided message.
*
* @param messageType message with fields to be processed
* @param unknownFieldSet received proto data fields
* @return Map of processed fields
*/
private Map<String, Object> processMessageFields(MessageType messageType, UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException {
Map<String, Object> recordValues = new HashMap<>();
for (final Field field : messageType.getDeclaredFields()) {
collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
}
for (final Field field : messageType.getExtensionFields()) {
collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
}
for (final OneOf oneOf : messageType.getOneOfs()) {
for (Field field : oneOf.getFields()) {
collectFieldValue(recordValues, new ProtoField(field), unknownFieldSet.getField(field.getTag()));
}
}
return recordValues;
}
/**
* Checks the field value's presence and sets it into the result Map.
*
* @param fieldNameToConvertedValue Map of converter values
* @param protoField proto field's properties
* @param unknownField field's value
*/
private void collectFieldValue(Map<String, Object> fieldNameToConvertedValue, ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException {
final Optional<Object> fieldValue = convertFieldValues(protoField, unknownField);
fieldValue.ifPresent(value -> fieldNameToConvertedValue.put(protoField.getFieldName(), value));
}
private Optional<Object> convertFieldValues(ProtoField protoField, UnknownFieldSet.Field unknownField) throws InvalidProtocolBufferException {
if (!unknownField.getLengthDelimitedList().isEmpty()) {
return Optional.of(convertLengthDelimitedFields(protoField, unknownField.getLengthDelimitedList()));
}
if (!unknownField.getFixed32List().isEmpty()) {
return Optional.of(convertFixed32Fields(protoField, unknownField.getFixed32List()));
}
if (!unknownField.getFixed64List().isEmpty()) {
return Optional.of(convertFixed64Fields(protoField, unknownField.getFixed64List()));
}
if (!unknownField.getVarintList().isEmpty()) {
return Optional.of(convertVarintFields(protoField, unknownField.getVarintList()));
}
return Optional.empty();
}
/**
* Converts a Length-Delimited field value into it's suitable data type.
*
* @param protoField proto field's properties
* @param values field's unprocessed values
* @return converted field values
* @throws InvalidProtocolBufferException failed to parse input data
*/
private Object convertLengthDelimitedFields(ProtoField protoField, List<ByteString> values) throws InvalidProtocolBufferException {
final ProtoType protoType = protoField.getProtoType();
final Function<ByteString, Object> valueConverter;
if (protoType.isScalar()) {
valueConverter = switch (FieldType.findValue(protoType.getSimpleName())) {
case STRING -> ByteString::toStringUtf8;
case BYTES -> ByteString::toByteArray;
default ->
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not LengthDelimited field type", protoField.getFieldName(), protoType.getSimpleName()));
};
} else if (protoType.isMap()) {
return createMap(protoType, values);
} else {
final MessageType messageType = (MessageType) schema.getType(protoType);
Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", protoType));
valueConverter = value -> {
try {
Optional<DataType> recordDataType = rootRecordSchema.getDataType(protoField.getFieldName());
RecordSchema recordSchema = recordDataType.map(dataType ->
((RecordDataType) dataType).getChildSchema()).orElse(generateRecordSchema(messageType.getType().toString()));
return createRecord(messageType, value, recordSchema);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Failed to create record from the provided input data for field " + protoField.getFieldName(), e);
}
};
}
return resolveFieldValue(protoField, values, valueConverter);
}
/**
* Converts a Fixed32 field value into it's suitable data type.
*
* @param protoField proto field's properties
* @param values field's unprocessed values
* @return converted field values
*/
private Object convertFixed32Fields(ProtoField protoField, List<Integer> values) {
final String typeName = protoField.getProtoType().getSimpleName();
final Function<Integer, Object> valueConverter =
switch (FieldType.findValue(typeName)) {
case FIXED32 -> value -> Long.parseLong(unsignedToString(value));
case SFIXED32 -> value -> value;
case FLOAT -> Float::intBitsToFloat;
default ->
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not Fixed32 field type", protoField.getFieldName(), typeName));
};
return resolveFieldValue(protoField, values, valueConverter);
}
/**
* Converts a Fixed64 field value into it's suitable data type.
*
* @param protoField proto field's properties
* @param values field's unprocessed values
* @return converted field values
*/
private Object convertFixed64Fields(ProtoField protoField, List<Long> values) {
final String typeName = protoField.getProtoType().getSimpleName();
final Function<Long, Object> valueConverter =
switch (FieldType.findValue(typeName)) {
case FIXED64 -> value -> new BigInteger(unsignedToString(value));
case SFIXED64 -> value -> value;
case DOUBLE -> Double::longBitsToDouble;
default ->
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not Fixed64 field type", protoField.getFieldName(), typeName));
};
return resolveFieldValue(protoField, values, valueConverter);
}
/**
* Converts a Varint field value into it's suitable data type.
*
* @param protoField proto field's properties
* @param values field's unprocessed values
* @return converted field values
*/
private Object convertVarintFields(ProtoField protoField, List<Long> values) {
final ProtoType protoType = protoField.getProtoType();
final Function<Long, Object> valueConverter;
if (protoType.isScalar()) {
valueConverter = switch (FieldType.findValue(protoType.getSimpleName())) {
case BOOL -> value -> value.equals(1L);
case INT32, SFIXED32 -> Long::intValue;
case UINT32, INT64, SFIXED64 -> value -> value;
case UINT64 -> value -> new BigInteger(unsignedToString(value));
case SINT32 -> value -> decodeZigZag32(value.intValue());
case SINT64 -> CodedInputStream::decodeZigZag64;
default ->
throw new IllegalStateException(String.format("Incompatible value was received for field [%s]," +
" [%s] is not Varint field type", protoField.getFieldName(), protoType.getSimpleName()));
};
} else {
valueConverter = value -> {
final EnumType enumType = (EnumType) schema.getType(protoType);
Objects.requireNonNull(enumType, String.format("Enum with name [%s] not found in the provided proto files", protoType));
return enumType.constant(Integer.parseInt(value.toString())).getName();
};
}
return resolveFieldValue(protoField, values, valueConverter);
}
private <T> Object resolveFieldValue(ProtoField protoField, List<T> values, Function<T, Object> valueConverter) {
List<Object> resultValues = values.stream().map(valueConverter).toList();
if (coerceTypes) {
final Optional<RecordField> recordField = rootRecordSchema.getField(protoField.getFieldName());
if (recordField.isPresent()) {
resultValues = resultValues.stream().map(value -> DataTypeUtils.convertType(value, recordField.get().getDataType(), recordField.get().getFieldName())).toList();
}
}
if (!protoField.isRepeatable()) {
return resultValues.get(0);
} else {
return resultValues.toArray();
}
}
/**
* Handles Map type creation in the record.
*
* @param protoType field's proto type
* @param data data to be processed
* @return created Map
* @throws InvalidProtocolBufferException failed to parse input data
*/
private Map<String, Object> createMap(ProtoType protoType, List<ByteString> data) throws InvalidProtocolBufferException {
Map<String, Object> mapResult = new HashMap<>();
for (final ByteString entry : data) {
final UnknownFieldSet unknownFieldSet = UnknownFieldSet.parseFrom(entry);
Map<String, Object> mapEntry = new HashMap<>();
collectFieldValue(mapEntry, new ProtoField(MAP_KEY_FIELD_NAME, protoType.getKeyType()), unknownFieldSet.getField(1));
collectFieldValue(mapEntry, new ProtoField(MAP_VALUE_FIELD_NAME, protoType.getValueType()), unknownFieldSet.getField(2));
mapResult.put(String.valueOf(mapEntry.get(MAP_KEY_FIELD_NAME)), mapEntry.get(MAP_VALUE_FIELD_NAME));
}
return mapResult;
}
/**
* Process a 'google.protobuf.Any' typed field. The method gets the schema for the message type provided in the 'type_url' property
* and parse the serialized message from the 'value' field. The result record will contain only the parsed message's fields.
*
* @param unknownFieldSet 'google.protobuf.Any' typed message's field list
* @return created record from the parsed message
* @throws InvalidProtocolBufferException failed to parse input data
*/
private MapRecord handleAnyField(UnknownFieldSet unknownFieldSet) throws InvalidProtocolBufferException {
Map<String, Object> recordValues = new HashMap<>();
collectFieldValue(recordValues, new ProtoField(ANY_TYPE_URL_FIELD_NAME, ProtoType.STRING), unknownFieldSet.getField(1));
collectFieldValue(recordValues, new ProtoField(ANY_VALUE_FIELD_NAME, ProtoType.BYTES), unknownFieldSet.getField(2));
final String typeName = String.valueOf(recordValues.get(ANY_TYPE_URL_FIELD_NAME));
final UnknownFieldSet anyFieldSet = UnknownFieldSet.parseFrom((byte[]) recordValues.get(ANY_VALUE_FIELD_NAME));
final MessageType messageType = (MessageType) schema.getType(getQualifiedTypeName(typeName));
Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", typeName));
return new MapRecord(generateRecordSchema(typeName), processMessageFields(messageType, anyFieldSet), false, dropUnknownFields);
}
/**
* Generates a schema for the provided message type
*
* @param typeName name of the message
* @return generated schema
*/
private RecordSchema generateRecordSchema(String typeName) {
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
return schemaParser.createSchema(getQualifiedTypeName(typeName));
}
/**
* Gets the fully qualified name of the message type.
*
* @param typeName name of the message
* @return fully qualified name of the message type
*/
private String getQualifiedTypeName(String typeName) {
return typeName.substring(typeName.lastIndexOf('/') + 1);
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.schema;
import com.squareup.wire.schema.EnumConstant;
import com.squareup.wire.schema.EnumType;
import com.squareup.wire.schema.Field;
import com.squareup.wire.schema.MessageType;
import com.squareup.wire.schema.OneOf;
import com.squareup.wire.schema.ProtoType;
import com.squareup.wire.schema.Schema;
import com.squareup.wire.schema.Type;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.EnumDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.services.protobuf.FieldType;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* Creates a {@link RecordSchema} for the provided proto schema.
*/
public class ProtoSchemaParser {
private final Schema schema;
public ProtoSchemaParser(Schema schema) {
this.schema = schema;
}
/**
* Creates a {@link RecordSchema} for the provided message type.
* @param messageTypeName proto message type
* @return record schema
*/
public RecordSchema createSchema(String messageTypeName) {
final MessageType messageType = (MessageType) schema.getType(messageTypeName);
Objects.requireNonNull(messageType, String.format("Message type with name [%s] not found in the provided proto files", messageTypeName));
List<RecordField> recordFields = new ArrayList<>();
recordFields.addAll(processFields(messageType.getDeclaredFields()));
recordFields.addAll(processFields(messageType.getExtensionFields()));
recordFields.addAll(processOneOfFields(messageType));
return new SimpleRecordSchema(recordFields);
}
/**
* Iterates through and process OneOf fields in the given message type.
* @param messageType message type
* @return generated {@link RecordSchema} list from the OneOf fields
*/
private List<RecordField> processOneOfFields(MessageType messageType) {
List<RecordField> recordFields = new ArrayList<>();
for (final OneOf oneOf : messageType.getOneOfs()) {
for (Field field : oneOf.getFields()) {
final DataType dataType = getDataTypeForField(field.getType());
recordFields.add(new RecordField(field.getName(), dataType, field.getDefault(), true));
}
}
return recordFields;
}
/**
* Iterates through and process fields in the given message type.
* @return generated {@link RecordSchema} list from the provided fields
*/
private List<RecordField> processFields(List<Field> fields) {
List<RecordField> recordFields = new ArrayList<>();
for (final Field field : fields) {
DataType dataType = getDataTypeForField(field.getType());
if (field.isRepeated()) {
dataType = RecordFieldType.ARRAY.getArrayDataType(dataType);
}
recordFields.add(new RecordField(field.getName(), dataType, field.getDefault(), !field.isRequired()));
}
return recordFields;
}
/**
* Checks the provided field's type and calls the proper {@link DataType} processing function.
* @param protoType field's type
* @return data type
*/
private DataType getDataTypeForField(ProtoType protoType) {
if (protoType.isScalar()) {
return getDataTypeForScalarField(protoType);
} else {
return getDataTypeForCompositeField(protoType);
}
}
/**
* Gets the suitable {@link DataType} for the provided composite field.
* @param protoType field's type
* @return data type
*/
private DataType getDataTypeForCompositeField(ProtoType protoType) {
if (protoType.isMap()) {
final DataType valueType = getDataTypeForField(protoType.getValueType());
return new MapDataType(valueType);
}
final Type fieldType = schema.getType(protoType);
if (fieldType instanceof MessageType) {
final RecordSchema recordSchema = createSchema(protoType.toString());
return new RecordDataType(recordSchema);
} else if (fieldType instanceof EnumType) {
return new EnumDataType(((EnumType) fieldType).getConstants().stream().map(EnumConstant::getName).toList());
} else {
throw new IllegalStateException("Unknown proto type: " + fieldType);
}
}
/**
* Gets the suitable {@link DataType} for the provided scalar field.
* @param protoType field's type
* @return data type
*/
private DataType getDataTypeForScalarField(ProtoType protoType) {
return switch (FieldType.findValue(protoType.getSimpleName())) {
case DOUBLE -> RecordFieldType.DOUBLE.getDataType();
case FLOAT -> RecordFieldType.FLOAT.getDataType();
case INT32, SFIXED32 -> RecordFieldType.INT.getDataType();
case UINT32, SINT32, FIXED32, INT64, SINT64, SFIXED64 -> RecordFieldType.LONG.getDataType();
case UINT64, FIXED64 -> RecordFieldType.BIGINT.getDataType();
case BOOL -> RecordFieldType.BOOLEAN.getDataType();
case STRING -> RecordFieldType.STRING.getDataType();
case BYTES -> RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
};
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.schema;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
public class ProtoSchemaStrategy implements SchemaAccessStrategy {
private final String messageType;
private final Schema schema;
public ProtoSchemaStrategy(String messageType, Schema schema) {
this.messageType = messageType;
this.schema = schema;
}
@Override
public RecordSchema getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema) {
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(schema);
return schemaParser.createSchema(messageType);
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return EnumSet.noneOf(SchemaField.class);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.validation;
import com.squareup.wire.schema.Schema;
public class ProtoValidationResource {
private final String protoDirectory;
private final Schema protoSchema;
public ProtoValidationResource(String protoDirectory, Schema protoSchema) {
this.protoDirectory = protoDirectory;
this.protoSchema = protoSchema;
}
public String getProtoDirectory() {
return protoDirectory;
}
public Schema getProtoSchema() {
return protoSchema;
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.services.protobuf.ProtobufReader

View File

@ -0,0 +1,189 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>ProtobufReader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<p>
The ProtobufReader Controller Service reads and parses a Protocol Buffers Message from binary format and creates a Record object.
The Controller Service must be configured with the same '.proto' file that was used for the Message encoding, and the fully qualified
name of the Message type including its package (e.g. mypackage.MyMessage). The Reader will always generate one record from the input
data which represents the provided Protocol Buffers Message type.
Further information about Protocol Buffers can be found here:
<a href="https://protobuf.dev/">protobuf.dev</a>
</p>
<h2>Data Type Mapping</h2>
<p>
When a record is parsed from incoming data, the Controller Service is going to map the Proto Message field types to the corresponding
NiFi data types. The mapping between the provided Message fields and the encoded input is always based on the field tag numbers.
When a field is defined as 'repeated' then it's data type will be an array with data type of it's originally specified type.
The following tables show which proto field type will correspond to which NiFi field type after the conversion.
</p>
<h3>
Scalar Value Types
</h3>
<p>
<table>
<tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data Type</th></tr>
<tr><td>double</td><td>fixed64</td><td>double</td></tr>
<tr><td>float</td><td>fixed32</td><td>float</td></tr>
<tr><td>int32</td><td>varint</td><td>int</td></tr>
<tr><td>int64</td><td>varint</td><td>long</td></tr>
<tr><td>uint32</td><td>varint</td><td>long</td></tr>
<tr><td>uint64</td><td>varint</td><td>bigint</td></tr>
<tr><td>sint32</td><td>varint</td><td>long</td></tr>
<tr><td>sint64</td><td>varint</td><td>long</td></tr>
<tr><td>fixed32</td><td>fixed32</td><td>long</td></tr>
<tr><td>fixed64</td><td>fixed64</td><td>bigint</td></tr>
<tr><td>sfixed32</td><td>varint</td><td>int</td></tr>
<tr><td>sfixed64</td><td>varint</td><td>long</td></tr>
<tr><td>bool</td><td>varint</td><td>boolean</td></tr>
<tr><td>string</td><td>length delimited</td><td>string</td></tr>
<tr><td>bytes</td><td>length delimited</td><td>array[byte]</td></tr>
</table>
</p>
<h3>
Composite Value Types
</h3>
<p>
<table>
<tr><th>Proto Type</th><th>Proto Wire Type</th><th>NiFi Data Type</th></tr>
<tr><td>message</td><td>length delimited</td><td>record</td></tr>
<tr><td>enum</td><td>varint</td><td>enum</td></tr>
<tr><td>map</td><td>length delimited</td><td>map</td></tr>
<tr><td>oneof</td><td>-</td><td>choice</td></tr>
</table>
</p>
<h2>Schemas and Type Coercion</h2>
<p>
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
the schema, that field will be stored in the Record's value list on its original type. If the field is found in the schema,
the data type of the received data is compared against the data type specified in the schema. If the types match, the value
of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service
will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type,
an Exception will be thrown.
</p>
<p>
The following rules apply when attempting to coerce a field value from one data type to another:
</p>
<ul>
<li>Any data type can be coerced into a String type.</li>
<li>Any numeric data type (Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
<li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
or "Timestamp Format."</li>
<li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
<code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
type but not an Integer.</li>
<li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
<li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
and the rest of the characters are ignored.</li>
</ul>
<p>
If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
will be thrown.
</p>
<h2>Schema Access Strategy</h2>
<p>
Beside the common Schema Access strategies like getting the schema from property value or accessing it from Schema Registry,
the ProtobufReader Controller Service offers another access strategy option called "Generate from Proto file". When using this strategy,
the Reader will generate the Record Schema from the provided '.proto' file and Message type. This is a recommended strategy when the user
doesn't want to manually create the schema or when no type coercion is needed.
</p>
<h2>Protobuf Any Field Type</h2>
<p>
Protocol Buffers offers further Message types called Well-Known Types. These are additionally provided messages that defines
complex structured types and wrappers for scalar types. The Any type is one of these Well-Known Types which is used to store an arbitrary
serialized Message along with an URL that describes the type of the serialized Message. Since the Message type and the embedded Message will be
available only when the Any Message is already populated with data, the ProtobufReader needs to do this Message processing at data conversion time.
The Reader is capable to generate schema for the embedded Message in the Any field and replace it in the result Record schema.
</p>
<h3>Example</h3>
<p>
There is a Message called 'TestMessage' which has only one field that is an Any typed field. There is another Message called 'NestedMessage'
that we would like to add as serialized Message in the value of 'anyField'.
</p>
<code><pre>
message Any {
string type_url = 1;
bytes value = 2;
}
message TestMessage {
google.protobuf.Any anyField = 3;
}
message NestedMessage {
string field_1 = 1;
string field_2 = 2;
string field_3 = 3;
}
</pre></code>
<p>
With normal data conversion our result would look like this:
</p>
<code><pre>
{
anyField : {
type_url : "type.googleapis.com/NestedMessage"
value : [84, 101, 115, 116, 32, 98, 121, 116, 101, 115]
}
}
</pre></code>
<p>
Result after the Protobuf Reader replaces the Any Message's fields with the processed embedded Message:
</p>
<code><pre>
{
anyField : {
field_1 : "value 1",
field_2 : "value 2",
field_3 : "value 3"
}
}
</pre></code>
</body>
</html>

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.squareup.wire.schema.CoreLoaderKt;
import com.squareup.wire.schema.Location;
import com.squareup.wire.schema.Schema;
import com.squareup.wire.schema.SchemaLoader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileSystems;
import java.util.Arrays;
import java.util.Collections;
import static org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_KEY_FIELD_NAME;
import static org.apache.nifi.services.protobuf.converter.ProtobufDataConverter.MAP_VALUE_FIELD_NAME;
public class ProtoTestUtil {
public static final String BASE_TEST_PATH = "src/test/resources/";
public static Schema loadProto3TestSchema() {
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Collections.singletonList(Location.get(BASE_TEST_PATH + "test_proto3.proto")), Collections.emptyList());
return schemaLoader.loadSchema();
}
public static Schema loadProto2TestSchema() {
final SchemaLoader schemaLoader = new SchemaLoader(FileSystems.getDefault());
schemaLoader.initRoots(Arrays.asList(
Location.get(BASE_TEST_PATH, "test_proto2.proto"),
Location.get(CoreLoaderKt.WIRE_RUNTIME_JAR, "google/protobuf/any.proto")), Collections.emptyList());
return schemaLoader.loadSchema();
}
public static InputStream generateInputDataForProto3() throws IOException, Descriptors.DescriptorValidationException {
DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "test_proto3.desc"));
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new Descriptors.FileDescriptor[0]);
Descriptors.Descriptor messageDescriptor = fileDescriptor.findMessageTypeByName("Proto3Message");
Descriptors.Descriptor nestedMessageDescriptor = fileDescriptor.findMessageTypeByName("NestedMessage");
Descriptors.EnumDescriptor enumValueDescriptor = fileDescriptor.findEnumTypeByName("TestEnum");
Descriptors.Descriptor mapDescriptor = nestedMessageDescriptor.findNestedTypeByName("TestMapEntry");
DynamicMessage mapEntry1 = DynamicMessage
.newBuilder(mapDescriptor)
.setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), "test_key_entry1")
.setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 101)
.build();
DynamicMessage mapEntry2 = DynamicMessage
.newBuilder(mapDescriptor)
.setField(mapDescriptor.findFieldByName(MAP_KEY_FIELD_NAME), "test_key_entry2")
.setField(mapDescriptor.findFieldByName(MAP_VALUE_FIELD_NAME), 202)
.build();
DynamicMessage nestedMessage = DynamicMessage
.newBuilder(nestedMessageDescriptor)
.setField(nestedMessageDescriptor.findFieldByNumber(20), enumValueDescriptor.findValueByNumber(2))
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 1")
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 2")
.addRepeatedField(nestedMessageDescriptor.findFieldByNumber(21), "Repeated 3")
.setField(nestedMessageDescriptor.findFieldByNumber(22), "One Of Option")
.setField(nestedMessageDescriptor.findFieldByNumber(23), true)
.setField(nestedMessageDescriptor.findFieldByNumber(24), 3)
.setField(nestedMessageDescriptor.findFieldByNumber(25), Arrays.asList(mapEntry1, mapEntry2))
.build();
DynamicMessage message = DynamicMessage
.newBuilder(messageDescriptor)
.setField(messageDescriptor.findFieldByNumber(1), true)
.setField(messageDescriptor.findFieldByNumber(2), "Test text")
.setField(messageDescriptor.findFieldByNumber(3), Integer.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(4), -1)
.setField(messageDescriptor.findFieldByNumber(5), Integer.MIN_VALUE)
.setField(messageDescriptor.findFieldByNumber(6), -2)
.setField(messageDescriptor.findFieldByNumber(7), Integer.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(8), Double.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(9), Float.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(10), "Test bytes".getBytes())
.setField(messageDescriptor.findFieldByNumber(11), Long.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(12), -1L)
.setField(messageDescriptor.findFieldByNumber(13), Long.MIN_VALUE)
.setField(messageDescriptor.findFieldByNumber(14), -2L)
.setField(messageDescriptor.findFieldByNumber(15), Long.MAX_VALUE)
.setField(messageDescriptor.findFieldByNumber(16), nestedMessage)
.build();
return message.toByteString().newInput();
}
public static InputStream generateInputDataForProto2() throws IOException, Descriptors.DescriptorValidationException {
DescriptorProtos.FileDescriptorSet anyDescriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "google/protobuf/any.desc"));
Descriptors.FileDescriptor anyDesc = Descriptors.FileDescriptor.buildFrom(anyDescriptorSet.getFile(0), new Descriptors.FileDescriptor[]{});
DescriptorProtos.FileDescriptorSet descriptorSet = DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(BASE_TEST_PATH + "test_proto2.desc"));
Descriptors.FileDescriptor fileDescriptor = Descriptors.FileDescriptor.buildFrom(descriptorSet.getFile(0), new Descriptors.FileDescriptor[]{anyDesc});
Descriptors.Descriptor messageDescriptor = fileDescriptor.findMessageTypeByName("Proto2Message");
Descriptors.Descriptor anyTestDescriptor = fileDescriptor.findMessageTypeByName("AnyValueMessage");
Descriptors.FieldDescriptor fieldDescriptor = fileDescriptor.findExtensionByName("extensionField");
Descriptors.Descriptor anyDescriptor = anyDesc.findMessageTypeByName("Any");
DynamicMessage anyTestMessage = DynamicMessage
.newBuilder(anyTestDescriptor)
.setField(anyTestDescriptor.findFieldByNumber(1), "Test field 1")
.setField(anyTestDescriptor.findFieldByNumber(2), "Test field 2")
.build();
DynamicMessage anyMessage = DynamicMessage
.newBuilder(anyDescriptor)
.setField(anyDescriptor.findFieldByNumber(1), "type.googleapis.com/AnyValueMessage")
.setField(anyDescriptor.findFieldByNumber(2), anyTestMessage.toByteArray())
.build();
DynamicMessage message = DynamicMessage
.newBuilder(messageDescriptor)
.setField(messageDescriptor.findFieldByNumber(1), true)
.setField(messageDescriptor.findFieldByNumber(3), anyMessage)
.setField(fieldDescriptor, Integer.MAX_VALUE)
.build();
return message.toByteString().newInput();
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf;
import com.google.protobuf.Descriptors;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.generateInputDataForProto3;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
public class TestProtobufRecordReader {
private static Schema protoSchema;
@BeforeAll
public static void setup(){
protoSchema = loadProto3TestSchema();
}
@Test
public void testReadRecord() throws Descriptors.DescriptorValidationException, IOException {
final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema());
final Record record = reader.nextRecord(false, false);
final Object field1 = record.getValue("booleanField");
assertEquals(true, field1);
assertInstanceOf(Boolean.class, field1);
final Object field2 = record.getValue("stringField");
assertEquals("Test text", field2);
assertInstanceOf(String.class, field2);
final Object field3 = record.getValue("int32Field");
assertEquals(Integer.MAX_VALUE, field3);
assertInstanceOf(Integer.class, field3);
final Object field4 = record.getValue("uint32Field");
assertNotNull(field4);
}
@Test
public void testReadRecordWithCoerceType() throws Descriptors.DescriptorValidationException, IOException {
final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema());
final Record record = reader.nextRecord(true, false);
final Object field1 = record.getValue("booleanField");
assertEquals("true", field1);
assertInstanceOf(String.class, field1);
final Object field2 = record.getValue("stringField");
assertEquals("Test text", field2);
assertInstanceOf(String.class, field2);
final Object field3 = record.getValue("int32Field");
assertEquals(String.valueOf(Integer.MAX_VALUE), field3);
assertInstanceOf(String.class, field3);
final Object field4 = record.getValue("uint32Field");
assertNotNull(field4);
}
@Test
public void testReadRecordWithDropUnknownFields() throws Descriptors.DescriptorValidationException, IOException {
final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema());
final Record record = reader.nextRecord(false, true);
final Object field1 = record.getValue("booleanField");
assertEquals(true, field1);
assertInstanceOf(Boolean.class, field1);
final Object field2 = record.getValue("stringField");
assertEquals("Test text", field2);
assertInstanceOf(String.class, field2);
final Object field3 = record.getValue("int32Field");
assertEquals(Integer.MAX_VALUE, field3);
assertInstanceOf(Integer.class, field3);
final Object field4 = record.getValue("uint32Field");
assertNull(field4);
}
@Test
public void testReadRecordWithCoerceTypeAndDropUnknownFields() throws Descriptors.DescriptorValidationException, IOException {
final ProtobufRecordReader reader = createReader(generateInputDataForProto3(), "Proto3Message", protoSchema, generateRecordSchema());
final Record record = reader.nextRecord(true, true);
final Object field1 = record.getValue("booleanField");
assertEquals("true", field1);
assertInstanceOf(String.class, field1);
final Object field2 = record.getValue("stringField");
assertEquals("Test text", field2);
assertInstanceOf(String.class, field2);
final Object field3 = record.getValue("int32Field");
assertEquals(String.valueOf(Integer.MAX_VALUE), field3);
assertInstanceOf(String.class, field3);
final Object field4 = record.getValue("uint32Field");
assertNull(field4);
}
private RecordSchema generateRecordSchema() {
final List<RecordField> fields = new ArrayList<>();
for (final String fieldName : new String[] {"booleanField", "stringField", "int32Field"}) {
fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType()));
}
return new SimpleRecordSchema(fields);
}
private ProtobufRecordReader createReader(InputStream in, String message, Schema schema, RecordSchema recordSchema) {
return new ProtobufRecordReader(schema, message, in, recordSchema);
}
}

View File

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.converter;
import com.google.protobuf.Descriptors;
import com.squareup.wire.schema.Schema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.services.protobuf.ProtoTestUtil;
import org.apache.nifi.services.protobuf.schema.ProtoSchemaParser;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Map;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestProtobufDataConverter {
@Test
public void testDataConverterForProto3() throws Descriptors.DescriptorValidationException, IOException {
final Schema schema = loadProto3TestSchema();
final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto3Message");
final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "Proto3Message", recordSchema, false, false);
final MapRecord record = dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto3());
assertEquals(true, record.getValue("booleanField"));
assertEquals("Test text", record.getValue("stringField"));
assertEquals(Integer.MAX_VALUE, record.getValue("int32Field"));
assertEquals(4294967295L, record.getValue("uint32Field"));
assertEquals(Integer.MIN_VALUE, record.getValue("sint32Field"));
assertEquals(4294967294L, record.getValue("fixed32Field"));
assertEquals(Integer.MAX_VALUE, record.getValue("sfixed32Field"));
assertEquals(Double.MAX_VALUE, record.getValue("doubleField"));
assertEquals(Float.MAX_VALUE, record.getValue("floatField"));
assertArrayEquals("Test bytes".getBytes(), (byte[]) record.getValue("bytesField"));
assertEquals(Long.MAX_VALUE, record.getValue("int64Field"));
assertEquals(new BigInteger("18446744073709551615"), DataTypeUtils.toBigInt(record.getValue("uint64Field"), "field12"));
assertEquals(Long.MIN_VALUE, record.getValue("sint64Field"));
assertEquals(new BigInteger("18446744073709551614"), DataTypeUtils.toBigInt(record.getValue("fixed64Field"), "field14"));
assertEquals(Long.MAX_VALUE, record.getValue("sfixed64Field"));
final MapRecord nestedRecord = (MapRecord) record.getValue("nestedMessage");
assertEquals("ENUM_VALUE_3", nestedRecord.getValue("testEnum"));
assertArrayEquals(new Object[]{"Repeated 1", "Repeated 2", "Repeated 3"}, (Object[]) nestedRecord.getValue("repeatedField"));
// assert only one field is set in the OneOf field
assertNull(nestedRecord.getValue("stringOption"));
assertNull(nestedRecord.getValue("booleanOption"));
assertEquals(3, nestedRecord.getValue("int32Option"));
assertEquals(Map.of("test_key_entry1", 101, "test_key_entry2", 202), nestedRecord.getValue("testMap"));
}
@Test
public void testDataConverterForProto2() throws Descriptors.DescriptorValidationException, IOException {
final Schema schema = loadProto2TestSchema();
final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto2Message");
final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "Proto2Message", recordSchema, false, false);
final MapRecord record = dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2());
assertEquals(true, record.getValue("booleanField"));
assertEquals("Missing field", record.getValue("stringField"));
assertEquals(Integer.MAX_VALUE, record.getValue("extensionField"));
final MapRecord anyValueRecord = (MapRecord) record.getValue("anyField");
assertEquals("Test field 1", anyValueRecord.getValue("anyStringField1"));
assertEquals("Test field 2", anyValueRecord.getValue("anyStringField2"));
}
@Test
public void testMissingMessage() {
final Schema schema = loadProto3TestSchema();
final RecordSchema recordSchema = new ProtoSchemaParser(schema).createSchema("Proto3Message");
final ProtobufDataConverter dataConverter = new ProtobufDataConverter(schema, "MissingMessage", recordSchema, false, false);
NullPointerException e = assertThrows(NullPointerException.class, () -> dataConverter.createRecord(ProtoTestUtil.generateInputDataForProto2()));
assertTrue(e.getMessage().contains("Message with name [MissingMessage] not found in the provided proto files"), e.getMessage());
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.services.protobuf.schema;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto2TestSchema;
import static org.apache.nifi.services.protobuf.ProtoTestUtil.loadProto3TestSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestProtoSchemaParser {
@Test
public void testSchemaParserForProto3() {
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(loadProto3TestSchema());
final SimpleRecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("booleanField", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("stringField", RecordFieldType.STRING.getDataType()),
new RecordField("int32Field", RecordFieldType.INT.getDataType()),
new RecordField("uint32Field", RecordFieldType.LONG.getDataType()),
new RecordField("sint32Field", RecordFieldType.LONG.getDataType()),
new RecordField("fixed32Field", RecordFieldType.LONG.getDataType()),
new RecordField("sfixed32Field", RecordFieldType.INT.getDataType()),
new RecordField("doubleField", RecordFieldType.DOUBLE.getDataType()),
new RecordField("floatField", RecordFieldType.FLOAT.getDataType()),
new RecordField("bytesField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType())),
new RecordField("int64Field", RecordFieldType.LONG.getDataType()),
new RecordField("uint64Field", RecordFieldType.BIGINT.getDataType()),
new RecordField("sint64Field", RecordFieldType.LONG.getDataType()),
new RecordField("fixed64Field", RecordFieldType.BIGINT.getDataType()),
new RecordField("sfixed64Field", RecordFieldType.LONG.getDataType()),
new RecordField("nestedMessage", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
new RecordField("testEnum", RecordFieldType.ENUM.getEnumDataType(Arrays.asList("ENUM_VALUE_1", "ENUM_VALUE_2", "ENUM_VALUE_3"))),
new RecordField("repeatedField", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())),
new RecordField("testMap", RecordFieldType.MAP.getMapDataType(RecordFieldType.INT.getDataType())),
new RecordField("stringOption", RecordFieldType.STRING.getDataType()),
new RecordField("booleanOption", RecordFieldType.BOOLEAN.getDataType()),
new RecordField("int32Option", RecordFieldType.INT.getDataType())
))))
));
final RecordSchema actual = schemaParser.createSchema("Proto3Message");
assertEquals(expected, actual);
}
@Test
public void testSchemaParserForProto2() {
final ProtoSchemaParser schemaParser = new ProtoSchemaParser(loadProto2TestSchema());
final SimpleRecordSchema expected = new SimpleRecordSchema(Arrays.asList(
new RecordField("booleanField", RecordFieldType.BOOLEAN.getDataType(), false),
new RecordField("stringField", RecordFieldType.STRING.getDataType(), "Missing field", true),
new RecordField("anyField", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Arrays.asList(
new RecordField("type_url", RecordFieldType.STRING.getDataType()),
new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()))
)))),
new RecordField("extensionField", RecordFieldType.INT.getDataType())
));
final RecordSchema actual = schemaParser.createSchema("Proto2Message");
assertEquals(expected, actual);
}
}

View File

@ -0,0 +1,7 @@
Ô
any.protogoogle.protobuf"6
Any
type_url ( RtypeUrl
value ( RvalueBv
com.google.protobufBAnyProtoPZ,google.golang.org/protobuf/types/known/anypb˘GPBŞGoogle.Protobuf.WellKnownTypesbproto3

View File

@ -0,0 +1,11 @@
ð
test_proto2.protogoogle/protobuf/any.proto" 
Proto2Message"
booleanField (R booleanField/
stringField ( : Missing fieldR stringField0
anyField ( 2.google.protobuf.AnyRanyField*d€€€€"e
AnyValueMessage(
anyStringField1 ( RanyStringField1(
anyStringField2 ( RanyStringField2:6
extensionField.Proto2Messaged (RextensionField

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";
import "google/protobuf/any.proto";
message Proto2Message {
extensions 100 to max;
required bool booleanField = 1;
optional string stringField = 2 [default = "Missing field"];
optional google.protobuf.Any anyField = 3;
}
message AnyValueMessage {
optional string anyStringField1 = 1;
optional string anyStringField2 = 2;
}
extend Proto2Message {
optional int32 extensionField = 100;
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto3";
message Proto3Message {
bool booleanField = 1;
string stringField = 2;
int32 int32Field = 3;
uint32 uint32Field = 4;
sint32 sint32Field = 5;
fixed32 fixed32Field = 6;
sfixed32 sfixed32Field = 7;
double doubleField = 8;
float floatField = 9;
bytes bytesField = 10;
int64 int64Field = 11;
uint64 uint64Field = 12;
sint64 sint64Field = 13;
fixed64 fixed64Field = 14;
sfixed64 sfixed64Field = 15;
NestedMessage nestedMessage = 16;
}
message NestedMessage {
TestEnum testEnum = 20;
repeated string repeatedField = 21;
oneof oneOfField {
string stringOption = 22;
bool booleanOption = 23;
int32 int32Option = 24;
}
map<string, int32> testMap = 25;
}
enum TestEnum {
ENUM_VALUE_1 = 0;
ENUM_VALUE_2 = 1;
ENUM_VALUE_3 = 2;
}

View File

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-protobuf-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-protobuf-services</module>
<module>nifi-protobuf-services-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>${kotlin.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
<version>${kotlin.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>${org.apache.commons.compress.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${org.apache.commons.io.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>${org.apache.commons.text.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${org.apache.commons.lang3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.bom.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.bom.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.bom.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -110,6 +110,7 @@
<module>nifi-apicurio-bundle</module>
<module>nifi-jolt-bundle</module>
<module>nifi-questdb-bundle</module>
<module>nifi-protobuf-bundle</module>
</modules>
<repositories>