NIFI-4037 added InvokeGRPC processor, with proto service IDL

NIFI-4038 added ListenGRPC processor

This closes #1947

Signed-off-by: Tony Kurc <tkurc@apache.org>
This commit is contained in:
m-hogue 2017-06-14 12:31:35 -04:00 committed by Tony Kurc
parent 3bf1d12706
commit 58a623dfa2
22 changed files with 3393 additions and 93 deletions

View File

@ -1083,7 +1083,7 @@ This product bundles HexViewJS available under an MIT License
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0'
The binary distribution of this product bundles 'Google Protocol Buffers Java 2.5.0 and 3.3.1'
which is licensed under a BSD license.
This license applies to all parts of Protocol Buffers except the following:
@ -1129,6 +1129,18 @@ which is licensed under a BSD license.
standalone and requires a support library to be linked with it. This
support library is itself covered by the above license.
The binary distribution of this product bundles 'The JSR-305 reference implementation'
which is licensed under a BSD license.
The JSR-305 reference implementation (lib/jsr305.jar) is
distributed under the terms of the New BSD license:
http://www.opensource.org/licenses/bsd-license.php
See the JSR-305 home page for more information:
http://code.google.com/p/jsr-305/
This product bundles 'JCraft Jzlib' which is available under a 3-Clause BSD License.
Copyright (c) 2002-2014 Atsuhiko Yamanaka, JCraft,Inc.

View File

@ -936,11 +936,6 @@ The following binary components are provided under the Apache Software License v
Curator Recipes
Copyright 2011-2014 The Apache Software Foundation
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
Copyright 2011 The Netty Project
(ASLv2) Apache Xerces Java
The following NOTICE information applies:
Apache Xerces Java
@ -1114,124 +1109,280 @@ The following binary components are provided under the Apache Software License v
Grok
Copyright 2014 Anthony Corbacho, and contributors.
(ASLv2) gRPC-Java
The following NOTICE information applies:
Copyright 2014, gRPC Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-----------------------------------------------------------------------
This product contains a modified portion of 'OkHttp', an open source
HTTP & SPDY client for Android and Java applications, which can be obtained
at:
* LICENSE:
* okhttp/third_party/okhttp/LICENSE (Apache License 2.0)
* HOMEPAGE:
* https://github.com/square/okhttp
* LOCATION_IN_GRPC:
* okhttp/third_party/okhttp
This product contains a modified portion of 'Netty', an open source
networking library, which can be obtained at:
* LICENSE:
* netty/third_party/netty/LICENSE.txt (Apache License 2.0)
* HOMEPAGE:
* https://netty.io
* LOCATION_IN_GRPC:
* netty/third_party/netty
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
=================
The Netty Project
=================
Please visit the Netty web site for more information:
Please visit the Netty web site for more information:
* http://netty.io/
* http://netty.io/
Copyright 2011 The Netty Project
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified version of 'JZlib', a re-implementation of
zlib in pure Java, which can be obtained at:
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD Style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified version of 'Webbit', a Java event based
WebSocket and HTTP server:
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/protobuf/
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* http://archive.apache.org/dist/harmony/
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product optionally depends on 'SLF4J', a simple logging facade for Java,
which can be obtained at:
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'Apache Log4J', a logging framework,
which can be obtained at:
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'JBoss Logging', a logging framework,
which can be obtained at:
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.jboss-logging.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://anonsvn.jboss.org/repos/common/common-logging-spi/
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'Apache Felix', an open source OSGi
framework implementation, which can be obtained at:
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.felix.txt (Apache License 2.0)
* HOMEPAGE:
* http://felix.apache.org/
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://www.jboss.org/jbossmarshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* http://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains a forked and modified version of Tomcat Native
* LICENSE:
* ASL2
* HOMEPAGE:
* http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/
(ASLv2) Error Prone
The following NOTICE information applies:
Copyright 2017 Google Inc.
(ASLv2) Instrumentation
The following NOTICE information applies:
Copyright 2016 Google Inc.
(ASLv2) Google APIs
The following NOTICE information applies:
Copyright 2016 Google Inc.
(ASLv2) Android JSON library
The following NOTICE information applies:

View File

@ -456,6 +456,11 @@
<artifactId>nifi-gcp-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-stateful-analysis-nar</artifactId>

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-grpc-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-processors</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,259 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
The binary distribution of this product bundles 'Google Protocol Buffers Java 3.3.1'
which is licensed under a BSD license.
This license applies to all parts of Protocol Buffers except the following:
- Atomicops support for generic gcc, located in
src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
This file is copyrighted by Red Hat Inc.
- Atomicops support for AIX/POWER, located in
src/google/protobuf/stubs/atomicops_internals_power.h.
This file is copyrighted by Bloomberg Finance LP.
Copyright 2014, Google Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Code generated by the Protocol Buffer compiler is owned by the owner
of the input file used when generating it. This code is not
standalone and requires a support library to be linked with it. This
support library is itself covered by the above license.
The binary distribution of this product bundles 'The JSR-305 reference implementation'
which is licensed under a BSD license.
The JSR-305 reference implementation (lib/jsr305.jar) is
distributed under the terms of the New BSD license:
http://www.opensource.org/licenses/bsd-license.php
See the JSR-305 home page for more information:
http://code.google.com/p/jsr-305/

View File

@ -0,0 +1,302 @@
nifi-grpc-nar
Copyright 2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
******************
Apache Software License v2
******************
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
Copyright 2001-2014 The Apache Software Foundation
This product includes software from the Spring Framework,
under the Apache License 2.0 (see: StringUtils.containsWhitespace())
(ASLv2) gRPC-Java
The following NOTICE information applies:
Copyright 2014, gRPC Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-----------------------------------------------------------------------
This product contains a modified portion of 'OkHttp', an open source
HTTP & SPDY client for Android and Java applications, which can be obtained
at:
* LICENSE:
* okhttp/third_party/okhttp/LICENSE (Apache License 2.0)
* HOMEPAGE:
* https://github.com/square/okhttp
* LOCATION_IN_GRPC:
* okhttp/third_party/okhttp
This product contains a modified portion of 'Netty', an open source
networking library, which can be obtained at:
* LICENSE:
* netty/third_party/netty/LICENSE.txt (Apache License 2.0)
* HOMEPAGE:
* https://netty.io
* LOCATION_IN_GRPC:
* netty/third_party/netty
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
=================
Please visit the Netty web site for more information:
* http://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* http://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://www.jboss.org/jbossmarshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* http://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains a forked and modified version of Tomcat Native
* LICENSE:
* ASL2
* HOMEPAGE:
* http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/
(ASLv2) Guava
The following NOTICE information applies:
Guava
Copyright 2015 The Guava Authors
(ASLv2) Google GSON
The following NOTICE information applies:
Copyright 2008 Google Inc.
(ASLv2) Error Prone
The following NOTICE information applies:
Copyright 2017 Google Inc.
(ASLv2) Instrumentation
The following NOTICE information applies:
Copyright 2016 Google Inc.
(ASLv2) Google APIs
The following NOTICE information applies:
Copyright 2016 Google Inc.

View File

@ -0,0 +1,139 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-grpc-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.3.Final</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<excludes>**/grpc/FlowFileRequest.java,**/grpc/FlowFileReply.java,**/grpc/FFSProto.java,**/grpc/FlowFileServiceGrpc.java</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.2.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.3.0:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.4</version>
<executions>
<execution>
<id>test</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${basedir}/target/generated-sources</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import io.grpc.stub.StreamObserver;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Simple gRPC service that handles receipt of FlowFileRequest messages from external-to-NiFi gRPC
* clients.
*/
// NOTE: you may need to add the sources generated after running `maven clean compile` to your IDE
// configured source directories. Otherwise, the classes generated when the proto is compiled won't
// be accessible from here. For IntelliJ, open this module's settings and mark the following as source directories:
//
// * target/generated-sources/protobuf/grpc-java
// * target/generated-sources/protobuf/java
public class FlowFileIngestService extends FlowFileServiceGrpc.FlowFileServiceImplBase {
public static final String SERVICE_NAME = "grpc://FlowFileIngestService";
public static final int FILES_BEFORE_CHECKING_DESTINATION_SPACE = 5;
private final AtomicLong filesReceived = new AtomicLong(0L);
private final AtomicBoolean spaceAvailable = new AtomicBoolean(true);
private final AtomicReference<ProcessSessionFactory> sessionFactoryReference;
private final ProcessContext context;
private final ComponentLog logger;
/**
* Create a FlowFileIngestService
*
* @param sessionFactoryReference a reference to a {@link ProcessSessionFactory} to route {@link
* FlowFile}s to process relationships.
*/
public FlowFileIngestService(final ComponentLog logger,
final AtomicReference<ProcessSessionFactory> sessionFactoryReference,
final ProcessContext context) {
this.context = checkNotNull(context);
this.sessionFactoryReference = checkNotNull(sessionFactoryReference);
this.logger = checkNotNull(logger);
}
/**
* Handle receipt of a FlowFileRequest and route it to the appropriate process relationship.
*
* @param request the flowfile request
* @param responseObserver the mechanism by which to reply to the client
*/
@Override
public void send(final org.apache.nifi.processors.grpc.FlowFileRequest request, final StreamObserver<FlowFileReply> responseObserver) {
final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder();
final String remoteHost = FlowFileIngestServiceInterceptor.REMOTE_HOST_KEY.get();
final String remoteDN = FlowFileIngestServiceInterceptor.REMOTE_DN_KEY.get();
// block until we have a session factory (occurs when processor is triggered)
ProcessSessionFactory sessionFactory = null;
while (sessionFactory == null) {
sessionFactory = sessionFactoryReference.get();
if (sessionFactory == null) {
try {
Thread.sleep(10);
} catch (final InterruptedException e) {
}
}
}
final ProcessSession session = sessionFactory.createSession();
// if there's no space available, reject the request.
final long n = filesReceived.getAndIncrement() % FILES_BEFORE_CHECKING_DESTINATION_SPACE;
if (n == 0 || !spaceAvailable.get()) {
if (context.getAvailableRelationships().isEmpty()) {
spaceAvailable.set(false);
final String message = "Received request from " + remoteHost + " but no space available; Indicating Service Unavailable";
if (logger.isDebugEnabled()) {
logger.debug(message);
}
final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
.setBody(message)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
return;
} else {
spaceAvailable.set(true);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Received request from " + remoteHost);
}
final long startNanos = System.nanoTime();
FlowFile flowFile = session.create();
// push the attributes provided onto the created flowfile
final Map<String, String> attributes = Maps.newHashMap();
attributes.putAll(request.getAttributesMap());
String sourceSystemFlowFileIdentifier = attributes.get(CoreAttributes.UUID.key());
if (sourceSystemFlowFileIdentifier != null) {
sourceSystemFlowFileIdentifier = "urn:nifi:" + sourceSystemFlowFileIdentifier;
// If we receveied a UUID, we want to give the FlowFile a new UUID and register the sending system's
// identifier as the SourceSystemFlowFileIdentifier field in the Provenance RECEIVE event
attributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
}
flowFile = session.putAllAttributes(flowFile, attributes);
final ByteString content = request.getContent();
final InputStream contentStream = content.newInput();
// write the provided content to the flowfile
flowFile = session.write(flowFile, out -> {
try (final BufferedOutputStream bos = new BufferedOutputStream(out, 65536)) {
IOUtils.copy(contentStream, bos);
}
});
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
session.getProvenanceReporter().receive(flowFile,
SERVICE_NAME,
sourceSystemFlowFileIdentifier,
"Remote DN=" + remoteDN,
transferMillis);
flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_HOST, remoteHost);
flowFile = session.putAttribute(flowFile, ListenGRPC.REMOTE_USER_DN, remoteDN);
// register success
session.transfer(flowFile, ListenGRPC.REL_SUCCESS);
session.commit();
// reply to client
final FlowFileReply reply = replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
.setBody("FlowFile successfully received.")
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import org.apache.nifi.logging.ComponentLog;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.regex.Pattern;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Simple gRPC service call interceptor that enforces various controls
*/
public class FlowFileIngestServiceInterceptor implements ServerInterceptor {
public static final String DEFAULT_FOUND_SUBJECT = "none";
private static final String UNKNOWN_IP = "unknown-ip";
private static final String DN_UNAUTHORIZED = "The client DN does not have permission to post FlowFiles to this NiFi. ";
private static final ServerCall.Listener IDENTITY_LISTENER = new ServerCall.Listener(){};
public static final Context.Key<String> REMOTE_HOST_KEY = Context.key(ListenGRPC.REMOTE_HOST);
public static final Context.Key<String> REMOTE_DN_KEY = Context.key(ListenGRPC.REMOTE_USER_DN);
private final ComponentLog logger;
private Pattern authorizedDNpattern;
/**
* Create an interceptor that applies various controls per request
*
* @param logger the {@link ComponentLog} for the ListenGRPC processor
*/
public FlowFileIngestServiceInterceptor(final ComponentLog logger) {
this.logger = checkNotNull(logger);
}
/**
* Enforce that the requestor DN matches the provided pattern.
*
* @param authorizedDNPattern the pattern which DNs must match
*
* @return this
*/
public FlowFileIngestServiceInterceptor enforceDNPattern(final Pattern authorizedDNPattern) {
this.authorizedDNpattern = checkNotNull(authorizedDNPattern);
return this;
}
/**
* Intercept incoming and outgoing messages and enforce any necessary controls
*
* @param call the request message
* @param headers the request metadata
* @param next the next interceptor in the interceptor chain prior to the service implementation
* @param <I> The message request type (e.g. ReqT)
* @param <O> The message reply type (e.g. RespT)
*
* @return a listener for the incoming call.
*/
@Override
public <I, O> ServerCall.Listener<I> interceptCall(
final ServerCall<I, O> call,
final Metadata headers,
final ServerCallHandler<I, O> next) {
final Attributes attributes = call.getAttributes();
final SocketAddress socketAddress = attributes.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
final String clientIp = clientIp(socketAddress);
String foundSubject = DEFAULT_FOUND_SUBJECT;
// enforce that the DN on the client cert matches the configured pattern
final SSLSession sslSession = attributes.get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
if(this.authorizedDNpattern != null && sslSession != null) {
try {
final X509Certificate[] certs = sslSession.getPeerCertificateChain();
if(certs != null && certs.length > 0) {
for (final X509Certificate cert : certs) {
foundSubject = cert.getSubjectDN().getName();
if(authorizedDNpattern.matcher(foundSubject).matches()) {
break;
} else {
logger.warn("Rejecting transfer attempt from " + foundSubject + " because the DN is not authorized, host=" + clientIp);
call.close(Status.PERMISSION_DENIED.withDescription(DN_UNAUTHORIZED + foundSubject), headers);
return IDENTITY_LISTENER;
}
}
}
} catch (final SSLPeerUnverifiedException e) {
logger.debug("skipping DN authorization for request from {}.", new Object[] {clientIp}, e);
}
}
// contextualize the DN and IP for use in the RPC implementation
final Context context = Context.current()
.withValue(REMOTE_HOST_KEY, clientIp)
.withValue(REMOTE_DN_KEY, foundSubject);
// if we got to this point, there were no errors, call the next interceptor in the chain
return Contexts.interceptCall(context, call, headers, next);
}
/**
* Grabs the client IP from the socket address pulled from the request metadata, or UNKNOWN
* if it's not possible to determine.
*
* @param socketAddress the socket address pulled from the gRPC request
* @return the client IP
*/
private String clientIp(final SocketAddress socketAddress) {
if (socketAddress == null) {
return UNKNOWN_IP;
}
if (!(socketAddress instanceof InetSocketAddress)) {
return socketAddress.toString();
}
final InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
final String hostString = inetSocketAddress.getHostString();
return hostString == null ? UNKNOWN_IP : hostString;
}
}

View File

@ -0,0 +1,442 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import com.google.protobuf.ByteString;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.KeyStore;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContextBuilder;
@SupportsBatching
@Tags({"grpc", "rpc", "client"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends FlowFiles, optionally with content, to a configurable remote gRPC service endpoint. The remote gRPC service must abide by the service IDL defined in NiFi. " +
" gRPC isn't intended to carry large payloads, so this processor should be used only when FlowFile" +
" sizes are on the order of megabytes. The default maximum message size is 4MB.")
@WritesAttributes({
@WritesAttribute(attribute = "invokegrpc.response.code", description = "The response code that is returned (0 = ERROR, 1 = SUCCESS, 2 = RETRY)"),
@WritesAttribute(attribute = "invokegrpc.response.body", description = "The response message that is returned"),
@WritesAttribute(attribute = "invokegrpc.service.host", description = "The remote gRPC service hostname"),
@WritesAttribute(attribute = "invokegrpc.service.port", description = "The remote gRPC service port"),
@WritesAttribute(attribute = "invokegrpc.java.exception.class", description = "The Java exception class raised when the processor fails"),
@WritesAttribute(attribute = "invokegrpc.java.exception.message", description = "The Java exception message raised when the processor fails"),
})
public class InvokeGRPC extends AbstractProcessor {
public static final String RESPONSE_CODE = "invokegrpc.response.code";
public static final String RESPONSE_BODY = "invokegrpc.response.body";
public static final String SERVICE_HOST = "invokegrpc.service.host";
public static final String SERVICE_PORT = "invokegrpc.service.port";
public static final String EXCEPTION_CLASS = "invokegrpc.java.exception.class";
public static final String EXCEPTION_MESSAGE = "invokegrpc.java.exception.message";
// properties
public static final PropertyDescriptor PROP_SERVICE_HOST = new PropertyDescriptor.Builder()
.name("Remote gRPC service hostname")
.description("Remote host which will be connected to")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder()
.name("Remote gRPC service port")
.description("Remote port which will be connected to")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("Max Message Size")
.description("The maximum size of FlowFiles that this processor will allow to be received." +
" The default is 4MB. If FlowFiles exceed this size, you should consider using another transport mechanism" +
" as gRPC isn't designed for heavy payloads.")
.defaultValue("4MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder()
.name("Use SSL/TLS")
.description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PROP_SEND_CONTENT = new PropertyDescriptor.Builder()
.name("Send FlowFile Content")
.description("Whether or not to include the FlowFile content in the FlowFileRequest to the gRPC service.")
.required(false)
.defaultValue("true")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_PENALIZE_NO_RETRY = new PropertyDescriptor.Builder()
.name("Penalize on \"No Retry\"")
.description("Enabling this property will penalize FlowFiles that are routed to the \"No Retry\" relationship.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_OUTPUT_RESPONSE_REGARDLESS = new PropertyDescriptor.Builder()
.name("Always Output Response")
.description("Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is "
+ "or if the processor is configured to put the server response body in the request attribute. In the later configuration a request FlowFile with the "
+ "response body in the attribute and a typical response FlowFile will be emitted to their respective relationships.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_SERVICE_HOST,
PROP_SERVICE_PORT,
PROP_MAX_MESSAGE_SIZE,
PROP_USE_SECURE,
PROP_SSL_CONTEXT_SERVICE,
PROP_SEND_CONTENT,
PROP_OUTPUT_RESPONSE_REGARDLESS,
PROP_PENALIZE_NO_RETRY
));
// relationships
public static final Relationship REL_SUCCESS_REQ = new Relationship.Builder()
.name("Original")
.description("The original FlowFile will be routed upon success. It will have new attributes detailing the "
+ "success of the request.")
.build();
public static final Relationship REL_RESPONSE = new Relationship.Builder()
.name("Response")
.description("A Response FlowFile will be routed upon success. If the 'Output Response Regardless' property "
+ "is true then the response will be sent to this relationship regardless of the status code received.")
.build();
public static final Relationship REL_RETRY = new Relationship.Builder()
.name("Retry")
.description("The original FlowFile will be routed on any status code that can be retried. It will have new "
+ "attributes detailing the request.")
.build();
public static final Relationship REL_NO_RETRY = new Relationship.Builder()
.name("No Retry")
.description("The original FlowFile will be routed on any status code that should NOT be retried. "
+ "It will have new attributes detailing the request.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("Failure")
.description("The original FlowFile will be routed on any type of connection failure, timeout or general exception. "
+ "It will have new attributes detailing the request.")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS_REQ,
REL_NO_RETRY,
REL_RESPONSE,
REL_RETRY,
REL_FAILURE
)));
private static final String USER_AGENT_PREFIX = "NiFi_invokeGRPC";
// NOTE: you may need to add the sources generated after running `maven clean compile` to your IDE
// configured source directories. Otherwise, the classes generated when the proto is compiled won't
// be accessible from here. For IntelliJ, open this module's settings and mark the following as source directories:
//
// * target/generated-sources/protobuf/grpc-java
// * target/generated-sources/protobuf/java
private final AtomicReference<FlowFileServiceGrpc.FlowFileServiceBlockingStub> blockingStubReference = new AtomicReference<>();
private final AtomicReference<ManagedChannel> channelReference = new AtomicReference<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
/**
* Whenever this processor is triggered, we need to construct a client in order to communicate
* with the configured gRPC service.
*
* @param context the processor context
*/
@OnScheduled
public void initializeClient(final ProcessContext context) throws Exception {
channelReference.set(null);
blockingStubReference.set(null);
final ComponentLog logger = getLogger();
final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
final int port = context.getProperty(PROP_SERVICE_PORT).asInteger();
final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
String userAgent = USER_AGENT_PREFIX;
try {
userAgent += "_" + InetAddress.getLocalHost().getHostName();
} catch (final UnknownHostException e) {
logger.warn("Unable to determine local hostname. Defaulting gRPC user agent to {}.", new Object[]{USER_AGENT_PREFIX}, e);
}
final NettyChannelBuilder nettyChannelBuilder = NettyChannelBuilder.forAddress(host, port)
// supports both gzip and plaintext, but will compress by default.
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.maxInboundMessageSize(maxMessageSize)
.userAgent(userAgent);
// configure whether or not we're using secure comms
final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
if (useSecure && sslContext != null) {
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) {
keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray());
}
keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray());
sslContextBuilder.keyManager(keyManagerFactory);
}
if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) {
trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray());
}
trustManagerFactory.init(trustStore);
sslContextBuilder.trustManager(trustManagerFactory);
}
nettyChannelBuilder.sslContext(sslContextBuilder.build());
} else {
nettyChannelBuilder.usePlaintext(true);
}
final ManagedChannel channel = nettyChannelBuilder.build();
final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = FlowFileServiceGrpc.newBlockingStub(channel);
channelReference.set(channel);
blockingStubReference.set(blockingStub);
}
/**
* Perform cleanup prior to JVM shutdown
*
* @param context the processor context
* @throws InterruptedException if there's an issue cleaning up
*/
@OnShutdown
public void shutdown(final ProcessContext context) throws InterruptedException {
// close the channel
final ManagedChannel channel = channelReference.get();
if (channel != null) {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile fileToProcess = null;
if (context.hasIncomingConnection()) {
fileToProcess = session.get();
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (fileToProcess == null && context.hasNonLoopConnection()) {
return;
}
}
final ComponentLog logger = getLogger();
final FlowFileServiceGrpc.FlowFileServiceBlockingStub blockingStub = blockingStubReference.get();
final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
final String port = context.getProperty(PROP_SERVICE_PORT).getValue();
fileToProcess = session.putAttribute(fileToProcess, SERVICE_HOST, host);
fileToProcess = session.putAttribute(fileToProcess, SERVICE_PORT, port);
FlowFile responseFlowFile = null;
try {
final FlowFileRequest.Builder requestBuilder = FlowFileRequest.newBuilder()
.setId(fileToProcess.getId())
.putAllAttributes(fileToProcess.getAttributes());
// if the processor is configured to send the content, turn the content into bytes
// and add it to the request.
final boolean sendContent = context.getProperty(PROP_SEND_CONTENT).asBoolean();
if (sendContent) {
try (final InputStream contents = session.read(fileToProcess)) {
requestBuilder.setContent(ByteString.readFrom(contents));
}
// emit provenance event
session.getProvenanceReporter().send(fileToProcess, getRemote(host, port), true);
}
final FlowFileRequest flowFileRequest = requestBuilder.build();
logRequest(logger, host, port, flowFileRequest);
final FlowFileReply flowFileReply = blockingStub.send(flowFileRequest);
logReply(logger, host, port, flowFileReply);
final FlowFileReply.ResponseCode responseCode = flowFileReply.getResponseCode();
final String body = flowFileReply.getBody();
fileToProcess = session.putAttribute(fileToProcess, RESPONSE_CODE, String.valueOf(responseCode));
fileToProcess = session.putAttribute(fileToProcess, RESPONSE_BODY, body);
responseFlowFile = session.create(fileToProcess);
route(fileToProcess, responseFlowFile, session, context, responseCode);
} catch (final Exception e) {
// penalize or yield
if (fileToProcess != null) {
logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, e);
fileToProcess = session.penalize(fileToProcess);
fileToProcess = session.putAttribute(fileToProcess, EXCEPTION_CLASS, e.getClass().getName());
fileToProcess = session.putAttribute(fileToProcess, EXCEPTION_MESSAGE, e.getMessage());
// transfer original to failure
session.transfer(fileToProcess, REL_FAILURE);
} else {
logger.error("Yielding processor due to exception encountered as a source processor: {}", e);
context.yield();
}
// cleanup
try {
if (responseFlowFile != null) {
session.remove(responseFlowFile);
}
} catch (final Exception e1) {
logger.error("Could not cleanup response flowfile due to exception: {}", new Object[]{e1}, e1);
}
}
}
/**
* Route the {@link FlowFile} request and response appropriately, depending on the gRPC service
* response code.
*
* @param request the flowfile request
* @param response the flowfile response
* @param session the processor session
* @param context the processor context
* @param responseCode the gRPC service response code
*/
private void route(FlowFile request, FlowFile response, final ProcessSession session, final ProcessContext context, final FlowFileReply.ResponseCode responseCode) {
boolean responseSent = false;
if (context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS).asBoolean()) {
session.transfer(response, REL_RESPONSE);
responseSent = true;
}
switch (responseCode) {
// if the rpc failed, transfer flowfile to no retry relationship and penalize flowfile
// if the rpc succeeded, transfer the request and response flowfiles
case SUCCESS:
session.transfer(request, REL_SUCCESS_REQ);
if (!responseSent) {
session.transfer(response, REL_RESPONSE);
}
break;
// if the gRPC service responded requesting a retry, then penalize the request and
// transfer it to the retry relationship. The flowfile contains attributes detailing this
// rpc request.
case RETRY:
request = session.penalize(request);
session.transfer(request, REL_RETRY);
// if we haven't sent the response by this point, clean it up.
if (!responseSent) {
session.remove(response);
}
break;
case ERROR:
case UNRECOGNIZED: // unrecognized response code returned from gRPC service
default:
final boolean penalize = context.getProperty(PROP_PENALIZE_NO_RETRY).asBoolean();
if (penalize) {
request = session.penalize(request);
}
session.transfer(request, REL_NO_RETRY);
// if we haven't sent the response by this point, clean it up.
if (!responseSent) {
session.remove(response);
}
break;
}
}
private String getRemote(final String host, final String port) {
return host + ":" + port;
}
private void logRequest(final ComponentLog logger, final String host, final String port, final FlowFileRequest flowFileRequest) {
logger.debug("\nRequest to remote service:\n\t{}\n{}",
new Object[]{getRemote(host, port), flowFileRequest.toString()});
}
private void logReply(final ComponentLog logger, final String host, final String port, final FlowFileReply flowFileReply) {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
new Object[]{getRemote(host, port), flowFileReply.toString()});
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Starts a gRPC server and listens on the given port to transform the incoming messages into FlowFiles." +
" The message format is defined by the standard gRPC protobuf IDL provided by NiFi. gRPC isn't intended to carry large payloads," +
" so this processor should be used only when FlowFile sizes are on the order of megabytes. The default maximum message size is 4MB.")
@Tags({"ingest", "grpc", "rpc", "listen"})
@WritesAttributes({
@WritesAttribute(attribute = "listengrpc.remote.user.dn", description = "The DN of the user who sent the FlowFile to this NiFi"),
@WritesAttribute(attribute = "listengrpc.remote.host", description = "The IP of the client who sent the FlowFile to this NiFi")
})
public class ListenGRPC extends AbstractSessionFactoryProcessor {
public static final String REMOTE_USER_DN = "listengrpc.remote.user.dn";
public static final String REMOTE_HOST = "listengrpc.remote.host";
// properties
public static final PropertyDescriptor PROP_SERVICE_PORT = new PropertyDescriptor.Builder()
.name("Local gRPC service port")
.description("The local port that the gRPC service will listen on.")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder()
.name("Use SSL/TLS")
.description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PROP_FLOW_CONTROL_WINDOW = new PropertyDescriptor.Builder()
.name("Flow Control Window")
.description("The initial HTTP/2 flow control window for both new streams and overall connection." +
" Flow-control schemes ensure that streams on the same connection do not destructively interfere with each other." +
" The default is 1MB.")
.defaultValue("1MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder()
.name("Max Message Size")
.description("The maximum size of FlowFiles that this processor will allow to be received." +
" The default is 4MB. If FlowFiles exceed this size, you should consider using another transport mechanism" +
" as gRPC isn't designed for heavy payloads.")
.defaultValue("4MB")
.required(false)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
.name("Authorized DN Pattern")
.description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
.required(true)
.defaultValue(".*")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
PROP_SERVICE_PORT,
PROP_USE_SECURE,
PROP_SSL_CONTEXT_SERVICE,
PROP_FLOW_CONTROL_WINDOW,
PROP_AUTHORIZED_DN_PATTERN,
PROP_MAX_MESSAGE_SIZE
));
// relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("Success")
.description("The FlowFile was received successfully.")
.build();
public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(Sets.newHashSet(Arrays.asList(
REL_SUCCESS
)));
private final AtomicReference<ProcessSessionFactory> sessionFactoryReference = new AtomicReference<>();
private volatile Server server = null;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@OnScheduled
public void startServer(final ProcessContext context) throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException {
final ComponentLog logger = getLogger();
// gather configured properties
final Integer port = context.getProperty(PROP_SERVICE_PORT).asInteger();
final Boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
final Integer flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue();
final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
final Pattern authorizedDnPattern = Pattern.compile(context.getProperty(PROP_AUTHORIZED_DN_PATTERN).getValue());
final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger());
callInterceptor.enforceDNPattern(authorizedDnPattern);
final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(),
sessionFactoryReference,
context);
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
.addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor))
// default (de)compressor registries handle both plaintext and gzip compressed messages
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.flowControlWindow(flowControlWindow)
.maxMessageSize(maxMessageSize);
if (useSecure && sslContext != null) {
// construct key manager
if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) {
throw new IllegalStateException("SSL is enabled, but no keystore has been configured. You must configure a keystore.");
}
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) {
keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray());
}
keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray());
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory);
// if the trust store is configured, then client auth is required.
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
sslContext.getProvider());
final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType());
try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) {
trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray());
}
trustManagerFactory.init(trustStore);
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
} else {
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.NONE);
}
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
serverBuilder = serverBuilder.sslContext(sslContextBuilder.build());
}
logger.info("Starting gRPC server on port: {}", new Object[]{port.toString()});
this.server = serverBuilder.build().start();
}
@OnStopped
public void stopServer(final ProcessContext context) {
if (this.server != null) {
try {
this.server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
getLogger().warn("Unable to cleanly shutdown embedded gRPC server due to {}", new Object[]{e});
this.server = null;
}
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
sessionFactoryReference.compareAndSet(null, sessionFactory);
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.grpc.InvokeGRPC
org.apache.nifi.processors.grpc.ListenGRPC

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// NOTE: you may need to add the sources generated when running `maven clean compile` to your IDE
// configured source directories. Otherwise, the classes generated when the proto is compiled won't
// be accessible. For IntelliJ, open this module's settings and mark the following as source directories:
//
// * target/generated-sources/protobuf/grpc-java
// * target/generated-sources/protobuf/java
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.nifi.processors.grpc";
option java_outer_classname = "FFSProto";
option objc_class_prefix = "FFS";
package org.apache.nifi.processors.grpc;
// The FlowFile service definition.
service FlowFileService {
// Sends a FlowFile (blocking rpc)
rpc Send (FlowFileRequest) returns (FlowFileReply) {}
}
// The request message
message FlowFileRequest {
// tags 1-15 require one byte to encode and should be left for commonly occurring tags.
// For that reason, tags 3-14 are left available.
//
// source: https://developers.google.com/protocol-buffers/docs/proto3#assigning-tags
int64 id = 1;
map<string, string> attributes = 2;
bytes content = 15;
}
// the reply message
message FlowFileReply {
enum ResponseCode {
ERROR = 0;
SUCCESS = 1;
RETRY = 2;
}
ResponseCode responseCode = 1;
string body = 2;
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import org.apache.nifi.ssl.StandardSSLContextService;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import static org.apache.nifi.processors.grpc.TestGRPCServer.NEED_CLIENT_AUTH;
/**
* Generic gRPC channel builder for use in unit testing. Consumers should use the channel built here
* to construct the desired stubs to communicate with a gRPC service.
*/
public class TestGRPCClient {
// Used to represent the ephemeral port range.
private static final int PORT_START = 49152;
private static final int PORT_END = 65535;
/**
* Can be used by clients to grab a random port in a range of ports
*
* @return a port to use for client/server comms
*/
public static int randomPort() {
// add 1 because upper bound is exclusive
return ThreadLocalRandom.current().nextInt(PORT_START, PORT_END + 1);
}
/**
* Build a channel with the given host and port and optional ssl properties.
*
* @param host the host to establish a connection with
* @param port the port on which to communicate with the host
* @return a constructed channel
*/
public static ManagedChannel buildChannel(final String host, final int port)
throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException {
return buildChannel(host, port, null);
}
/**
* Build a channel with the given host and port and optional ssl properties.
*
* @param host the host to establish a connection with
* @param port the port on which to communicate with the host
* @param sslProperties the properties by which to establish an ssl connection
* @return a constructed channel
*/
public static ManagedChannel buildChannel(final String host, final int port, final Map<String, String> sslProperties)
throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException {
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port)
.directExecutor()
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.userAgent("testAgent");
if (sslProperties != null) {
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
if(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
final KeyStore keyStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
final String keyStoreFile = sslProperties.get(StandardSSLContextService.KEYSTORE.getName());
final String keyStorePassword = sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName());
try (final InputStream is = new FileInputStream(keyStoreFile)) {
keyStore.load(is, keyStorePassword.toCharArray());
}
keyManager.init(keyStore, keyStorePassword.toCharArray());
sslContextBuilder = sslContextBuilder.keyManager(keyManager);
}
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
final KeyStore trustStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
final String trustStoreFile = sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName());
final String trustStorePassword = sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName());
try (final InputStream is = new FileInputStream(trustStoreFile)) {
trustStore.load(is, trustStorePassword.toCharArray());
}
trustManagerFactory.init(trustStore);
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
} else {
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
}
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
channelBuilder = channelBuilder.sslContext(sslContextBuilder.build());
} else {
channelBuilder.usePlaintext(true);
}
return channelBuilder.build();
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.security.KeyStore;
import java.util.Map;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.nifi.ssl.StandardSSLContextService;
import io.grpc.BindableService;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Server;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
/**
* Generic gRPC test server to assist with unit tests that require a server to be present.
*
* @param <T> the gRPC service implementation
*/
public class TestGRPCServer<T extends BindableService> {
public static final String HOST = "localhost";
public static final String NEED_CLIENT_AUTH = "needClientAuth";
private final Class<T> clazz;
private Server server;
private Map<String, String> sslProperties;
/**
* Create a gRPC server
*
* @param clazz the gRPC service implementation
*/
public TestGRPCServer(final Class<T> clazz) {
this(clazz, null);
}
/**
* Create a gRPC server
*
* @param clazz the gRPC service implementation
* @param sslProperties the keystore and truststore properties for SSL communications
*/
public TestGRPCServer(final Class<T> clazz, final Map<String, String> sslProperties) {
this.clazz = clazz;
this.sslProperties = sslProperties;
}
/**
* Can be used by clients to grab a random port in a range of ports
*
* @return a port to use for client/server comms
*/
public static int randomPort() throws IOException {
ServerSocket socket = new ServerSocket(0);
socket.setReuseAddress(true);
final int port = socket.getLocalPort();
socket.close();
return port;
}
/**
* Starts the gRPC server @localhost:port.
*/
public void start(final int port) throws Exception {
final NettyServerBuilder nettyServerBuilder = NettyServerBuilder
.forPort(port)
.directExecutor()
.addService(clazz.newInstance())
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance());
if (this.sslProperties != null) {
if (sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) == null) {
throw new RuntimeException("You must configure a keystore in order to use SSL with gRPC.");
}
final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
final KeyStore keyStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.KEYSTORE_TYPE.getName()));
final String keyStoreFile = sslProperties.get(StandardSSLContextService.KEYSTORE.getName());
final String keyStorePassword = sslProperties.get(StandardSSLContextService.KEYSTORE_PASSWORD.getName());
try (final InputStream is = new FileInputStream(keyStoreFile)) {
keyStore.load(is, keyStorePassword.toCharArray());
}
keyManager.init(keyStore, keyStorePassword.toCharArray());
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager);
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
final KeyStore trustStore = KeyStore.getInstance(sslProperties.get(StandardSSLContextService.TRUSTSTORE_TYPE.getName()));
final String trustStoreFile = sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName());
final String trustStorePassword = sslProperties.get(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName());
try (final InputStream is = new FileInputStream(trustStoreFile)) {
trustStore.load(is, trustStorePassword.toCharArray());
}
trustManagerFactory.init(trustStore);
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
}
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
if (clientAuth == null) {
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
} else {
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
}
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
nettyServerBuilder.sslContext(sslContextBuilder.build());
}
server = nettyServerBuilder.build().start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
TestGRPCServer.this.stop();
System.err.println("*** server shut down");
}
});
}
/**
* Stop the server.
*/
void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}

View File

@ -0,0 +1,557 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.grpc.stub.StreamObserver;
import io.netty.handler.ssl.ClientAuth;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
public class TestInvokeGRPC {
// ids placed on flowfiles and used to dictate response codes in the DummyFlowFileService below
private static final long ERROR = 500;
private static final long SUCCESS = 501;
private static final long RETRY = 502;
@Test
public void testSuccess() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile successFile = successFiles.get(0);
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testSuccessWithFlowFileContent() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.enqueue("content");
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile successFile = successFiles.get(0);
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "content");
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testSuccessAlwaysOutputResponse() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile successFile = successFiles.get(0);
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testExceedMaxMessageSize() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
// set max message size to 1B to force error
runner.setProperty(InvokeGRPC.PROP_MAX_MESSAGE_SIZE, "1B");
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
// an exception should be thrown indicating that the max message size was exceeded.
response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException");
} finally {
server.stop();
}
}
@Test
public void testRetry() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
runner.assertPenalizeCount(1);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testRetryAlwaysOutputResponse() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");
final MockFlowFile mockFlowFile = new MockFlowFile(RETRY);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 1);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
runner.assertPenalizeCount(1);
final List<MockFlowFile> retryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RETRY);
assertThat(retryFiles.size(), equalTo(1));
final MockFlowFile retry = retryFiles.get(0);
retry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
retry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
retry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
retry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.RETRY));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "retry");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testNoRetryOnError() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testNoRetryOnErrorAlwaysOutputResponseAndPenalize() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.setProperty(InvokeGRPC.PROP_OUTPUT_RESPONSE_REGARDLESS, "true");
runner.setProperty(InvokeGRPC.PROP_PENALIZE_NO_RETRY, "true");
final MockFlowFile mockFlowFile = new MockFlowFile(ERROR);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 1);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
runner.assertPenalizeCount(1);
final List<MockFlowFile> noRetryFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_NO_RETRY);
assertThat(noRetryFiles.size(), equalTo(1));
final MockFlowFile noRetry = noRetryFiles.get(0);
noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
noRetry.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
noRetry.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.ERROR));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "error");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testNoInput() throws Exception {
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class);
try {
final int port = TestGRPCServer.randomPort();
server.start(port);
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
runner.assertPenalizeCount(0);
} finally {
server.stop();
}
}
@Test
public void testServerConnectionFail() throws Exception {
final int port = TestGRPCServer.randomPort();
// should be no gRPC server running @ that port, so processor will fail
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, Integer.toString(port));
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 0);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 0);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 1);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_FAILURE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, Integer.toString(port));
response.assertAttributeEquals(InvokeGRPC.EXCEPTION_CLASS, "io.grpc.StatusRuntimeException");
}
@Test
public void testSecureTwoWaySsl() throws Exception {
final Map<String, String> sslProperties = getKeystoreProperties();
sslProperties.putAll(getTruststoreProperties());
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties);
try {
final int port = TestGRPCServer.randomPort();
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");
useSSLContextService(runner, sslProperties);
server.start(port);
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile successFile = successFiles.get(0);
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
@Test
public void testSecureOneWaySsl() throws Exception {
final Map<String, String> sslProperties = getKeystoreProperties();
sslProperties.put(TestGRPCServer.NEED_CLIENT_AUTH, ClientAuth.NONE.name());
final TestGRPCServer<DummyFlowFileService> server = new TestGRPCServer<>(DummyFlowFileService.class, sslProperties);
try {
final int port = TestGRPCServer.randomPort();
final TestRunner runner = TestRunners.newTestRunner(InvokeGRPC.class);
runner.setProperty(InvokeGRPC.PROP_SERVICE_HOST, TestGRPCServer.HOST);
runner.setProperty(InvokeGRPC.PROP_SERVICE_PORT, String.valueOf(port));
runner.setProperty(InvokeGRPC.PROP_USE_SECURE, "true");
useSSLContextService(runner, getTruststoreProperties());
server.start(port);
final MockFlowFile mockFlowFile = new MockFlowFile(SUCCESS);
runner.enqueue(mockFlowFile);
runner.run();
runner.assertTransferCount(InvokeGRPC.REL_RESPONSE, 1);
runner.assertTransferCount(InvokeGRPC.REL_SUCCESS_REQ, 1);
runner.assertTransferCount(InvokeGRPC.REL_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_NO_RETRY, 0);
runner.assertTransferCount(InvokeGRPC.REL_FAILURE, 0);
final List<MockFlowFile> responseFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_RESPONSE);
assertThat(responseFiles.size(), equalTo(1));
final MockFlowFile response = responseFiles.get(0);
response.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
response.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
response.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
response.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(InvokeGRPC.REL_SUCCESS_REQ);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile successFile = successFiles.get(0);
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_CODE, String.valueOf(FlowFileReply.ResponseCode.SUCCESS));
successFile.assertAttributeEquals(InvokeGRPC.RESPONSE_BODY, "success");
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_HOST, TestGRPCServer.HOST);
successFile.assertAttributeEquals(InvokeGRPC.SERVICE_PORT, String.valueOf(port));
} finally {
server.stop();
}
}
private static Map<String, String> getTruststoreProperties() {
final Map<String, String> props = new HashMap<>();
props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
return props;
}
private static Map<String, String> getKeystoreProperties() {
final Map<String, String> properties = new HashMap<>();
properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
return properties;
}
private void useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) {
final SSLContextService service = new StandardSSLContextService();
try {
controller.addControllerService("ssl-service", service, sslProperties);
controller.enableControllerService(service);
} catch (InitializationException ex) {
ex.printStackTrace();
Assert.fail("Could not create SSL Context Service");
}
controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE, "ssl-service");
}
/**
* Dummy gRPC service whose responses are dictated by the IDs on the messages it receives
*/
private static class DummyFlowFileService extends FlowFileServiceGrpc.FlowFileServiceImplBase {
public DummyFlowFileService() {
}
@Override
public void send(FlowFileRequest request, StreamObserver<FlowFileReply> responseObserver) {
final FlowFileReply.Builder replyBuilder = FlowFileReply.newBuilder();
// use the id to dictate response codes
final long id = request.getId();
if (id == ERROR) {
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.ERROR)
.setBody("error");
} else if (id == SUCCESS) {
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
.setBody("success");
} else if (id == RETRY) {
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.RETRY)
.setBody("retry");
// else, assume the request is to include the flowfile content in the response
} else {
replyBuilder.setResponseCode(FlowFileReply.ResponseCode.SUCCESS)
.setBody(request.getContent().toStringUtf8());
}
responseObserver.onNext(replyBuilder.build());
responseObserver.onCompleted();
}
}
}

View File

@ -0,0 +1,401 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.grpc.ManagedChannel;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class TestListenGRPC {
private static final String HOST = "localhost";
private static final String CERT_DN = "CN=localhost, OU=Apache NiFi, O=Apache, L=Santa Monica, ST=CA, C=US";
private static final String SOURCE_SYSTEM_UUID = "FAKE_UUID";
private static Map<String, String> getTruststoreProperties() {
final Map<String, String> props = new HashMap<>();
props.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
props.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
props.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
return props;
}
private static Map<String, String> getKeystoreProperties() {
final Map<String, String> properties = new HashMap<>();
properties.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
properties.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
properties.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
return properties;
}
private static void useSSLContextService(final TestRunner controller, final Map<String, String> sslProperties) {
final SSLContextService service = new StandardSSLContextService();
try {
controller.addControllerService("ssl-service", service, sslProperties);
controller.enableControllerService(service);
} catch (InitializationException ex) {
ex.printStackTrace();
Assert.fail("Could not create SSL Context Service");
}
controller.setProperty(InvokeGRPC.PROP_SSL_CONTEXT_SERVICE, "ssl-service");
}
@Test
public void testSuccessfulRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.putAttributes(CoreAttributes.UUID.key(), SOURCE_SYSTEM_UUID)
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test
public void testOutOfSpaceRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
final ProcessContext processContext = spy(runner.getProcessContext());
// force the context to return that space isn't available, prompting an error message to be returned.
when(processContext.getAvailableRelationships()).thenReturn(Sets.newHashSet());
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.ERROR));
assertThat(reply.getBody(), containsString("but no space available; Indicating Service Unavailable"));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 0);
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test(expected = io.grpc.StatusRuntimeException.class)
public void testExceedMaxMessageSize() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
// set max message size to 1 byte to force exception to be thrown.
runner.setProperty(ListenGRPC.PROP_MAX_MESSAGE_SIZE, "1B");
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.putAttributes(CoreAttributes.UUID.key(), SOURCE_SYSTEM_UUID)
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
// this should throw a runtime exception
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test
public void testSecureTwoWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final Map<String, String> sslProperties = getKeystoreProperties();
sslProperties.putAll(getTruststoreProperties());
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
runner.setProperty(ListenGRPC.PROP_USE_SECURE, "true");
useSSLContextService(runner, sslProperties);
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(CERT_DN));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test
public void testSecureOneWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, InterruptedException {
final int randPort = TestGRPCClient.randomPort();
final Map<String, String> sslProperties = getTruststoreProperties();
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
runner.setProperty(ListenGRPC.PROP_USE_SECURE, "true");
useSSLContextService(runner, getKeystoreProperties());
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
// known race condition spot: grpc reply vs flowfile transfer
Thread.sleep(10);
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(FlowFileIngestServiceInterceptor.DEFAULT_FOUND_SUBJECT));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test(expected = io.grpc.StatusRuntimeException.class)
public void testSecureTwoWaySSLFailAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final Map<String, String> sslProperties = getKeystoreProperties();
sslProperties.putAll(getTruststoreProperties());
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
runner.setProperty(ListenGRPC.PROP_USE_SECURE, "true");
runner.setProperty(ListenGRPC.PROP_AUTHORIZED_DN_PATTERN, "CN=FAKE.*");
useSSLContextService(runner, sslProperties);
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(CERT_DN));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
@Test
public void testSecureTwoWaySSLPassAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
final int randPort = TestGRPCClient.randomPort();
final Map<String, String> sslProperties = getKeystoreProperties();
sslProperties.putAll(getTruststoreProperties());
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties);
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
final ListenGRPC listenGRPC = new ListenGRPC();
final TestRunner runner = TestRunners.newTestRunner(listenGRPC);
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, String.valueOf(randPort));
runner.setProperty(ListenGRPC.PROP_USE_SECURE, "true");
runner.setProperty(ListenGRPC.PROP_AUTHORIZED_DN_PATTERN, "CN=localhost.*");
useSSLContextService(runner, sslProperties);
final ProcessContext processContext = runner.getProcessContext();
final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory();
try {
// start the server. The order of the following statements shouldn't matter, because the
// startServer() method waits for a processSessionFactory to be available to it.
listenGRPC.startServer(processContext);
listenGRPC.onTrigger(processContext, processSessionFactory);
final FlowFileRequest ingestFile = FlowFileRequest.newBuilder()
.putAttributes("FOO", "BAR")
.setContent(ByteString.copyFrom("content".getBytes()))
.build();
final FlowFileReply reply = stub.send(ingestFile);
assertThat(reply.getResponseCode(), equalTo(FlowFileReply.ResponseCode.SUCCESS));
assertThat(reply.getBody(), equalTo("FlowFile successfully received."));
runner.assertTransferCount(ListenGRPC.REL_SUCCESS, 1);
final List<MockFlowFile> successFiles = runner.getFlowFilesForRelationship(ListenGRPC.REL_SUCCESS);
assertThat(successFiles.size(), equalTo(1));
final MockFlowFile mockFlowFile = successFiles.get(0);
assertThat(mockFlowFile.getAttribute("FOO"), equalTo("BAR"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_HOST), equalTo("127.0.0.1"));
assertThat(mockFlowFile.getAttribute(ListenGRPC.REMOTE_USER_DN), equalTo(CERT_DN));
} finally {
// stop the server
listenGRPC.stopServer(processContext);
channel.shutdown();
}
}
}

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.4.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-bundle</artifactId>
<version>1.4.0-SNAPSHOT</version>
<packaging>pom</packaging>
<description>A bundle of processors that speak the gRPC protocol</description>
<modules>
<module>nifi-grpc-processors</module>
<module>nifi-grpc-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-processors</artifactId>
<version>1.4.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -83,7 +83,8 @@
<module>nifi-cybersecurity-bundle</module>
<module>nifi-parquet-bundle</module>
<module>nifi-extension-utils</module>
<module>nifi-redis-bundle</module>
<module>nifi-grpc-bundle</module>
<module>nifi-redis-bundle</module>
</modules>
<build>

View File

@ -1414,6 +1414,12 @@
<version>1.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-grpc-nar</artifactId>
<version>1.4.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-stateful-analysis-nar</artifactId>