NIFI-12115 This closes #7830. Added ListenOTLP to collect OpenTelemetry

- Added ListenOTLP Processor supporting OpenTelemetry OTLP 1.0.0 Specification with gRPC and HTTP
- Updated nifi-event-transport to support configurable SSLParameters for configurable Cipher Suites

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2023-09-30 16:22:46 -05:00 committed by Joseph Witt
parent 721628eb95
commit 6394912cce
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
45 changed files with 3936 additions and 8 deletions

View File

@ -612,6 +612,12 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-opentelemetry-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-snmp-nar</artifactId>

View File

@ -41,6 +41,7 @@ import org.apache.nifi.event.transport.netty.channel.ssl.ServerSslHandlerChannel
import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Collections;
@ -58,7 +59,7 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
private final TransportProtocol protocol;
private Supplier<List<ChannelHandler>> handlerSupplier = () -> Collections.emptyList();
private Supplier<List<ChannelHandler>> handlerSupplier = Collections::emptyList;
private Integer socketReceiveBuffer;
@ -66,6 +67,8 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
private SSLContext sslContext;
private SSLParameters sslParameters;
private ClientAuth clientAuth = ClientAuth.NONE;
private Duration shutdownQuietPeriod = ShutdownQuietPeriod.DEFAULT.getDuration();
@ -118,6 +121,15 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
this.sslContext = sslContext;
}
/**
* Set SSL Parameters for optional additional configuration of TLS negotiation
*
* @param sslParameters SSL Parameters
*/
public void setSslParameters(final SSLParameters sslParameters) {
this.sslParameters = sslParameters;
}
/**
* Set Client Authentication
*
@ -203,10 +215,21 @@ public class NettyEventServerFactory extends EventLoopGroupFactory implements Ev
}
}
private ChannelInitializer getChannelInitializer() {
final StandardChannelInitializer<Channel> channelInitializer = sslContext == null
? new StandardChannelInitializer<>(handlerSupplier)
: new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth);
private ChannelInitializer<?> getChannelInitializer() {
final StandardChannelInitializer<Channel> channelInitializer;
if (sslContext == null) {
channelInitializer = new StandardChannelInitializer<>(handlerSupplier);
} else {
final SSLParameters parameters;
if (sslParameters == null) {
parameters = sslContext.getDefaultSSLParameters();
} else {
parameters = sslParameters;
}
channelInitializer = new ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth, parameters);
}
if (idleTimeout != null) {
channelInitializer.setIdleTimeout(idleTimeout);
}

View File

@ -69,9 +69,13 @@ public class StandardChannelInitializer<T extends Channel> extends ChannelInitia
@Override
protected void initChannel(Channel channel) {
final ChannelPipeline pipeline = channel.pipeline();
pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
pipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
pipeline.addLast(new CloseContextIdleStateHandler());
if (idleTimeout.getSeconds() > 0) {
pipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS));
pipeline.addLast(new CloseContextIdleStateHandler());
}
handlerSupplier.get().forEach(pipeline::addLast);
}
}

View File

@ -25,6 +25,7 @@ import org.apache.nifi.security.util.ClientAuth;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
@ -38,16 +39,26 @@ public class ServerSslHandlerChannelInitializer<T extends Channel> extends Stan
private final ClientAuth clientAuth;
private final SSLParameters sslParameters;
/**
* Server SSL Channel Initializer with handlers and SSLContext
*
* @param handlerSupplier Channel Handler Supplier
* @param sslContext SSLContext
* @param clientAuth Client Authentication configuration
* @param sslParameters SSL Parameters
*/
public ServerSslHandlerChannelInitializer(final Supplier<List<ChannelHandler>> handlerSupplier, final SSLContext sslContext, final ClientAuth clientAuth) {
public ServerSslHandlerChannelInitializer(
final Supplier<List<ChannelHandler>> handlerSupplier,
final SSLContext sslContext,
final ClientAuth clientAuth,
final SSLParameters sslParameters
) {
super(handlerSupplier);
this.sslContext = Objects.requireNonNull(sslContext, "SSLContext is required");
this.clientAuth = Objects.requireNonNull(clientAuth, "ClientAuth is required");
this.sslParameters = Objects.requireNonNull(sslParameters, "SSLParameters is required");
}
@Override
@ -59,6 +70,8 @@ public class ServerSslHandlerChannelInitializer<T extends Channel> extends Stan
private SslHandler newSslHandler() {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setSSLParameters(sslParameters);
sslEngine.setUseClientMode(false);
if (ClientAuth.REQUIRED.equals(clientAuth)) {
sslEngine.setNeedClientAuth(true);

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-opentelemetry-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-opentelemetry-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-opentelemetry-processors</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,317 @@
nifi-opentelemetry-nar
Copyright 2014-2023 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
===========================================
Apache Software License v2
===========================================
The following binary components are provided under the Apache Software License v2
(ASLv2) Apache Commons Codec
The following NOTICE information applies:
Apache Commons Codec
Copyright 2002-2023 The Apache Software Foundation
src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
contains test data from http://aspell.net/test/orig/batch0.tab.
Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
===============================================================================
The content of package org.apache.commons.codec.language.bm has been translated
from the original php source code available at http://stevemorse.org/phoneticinfo.htm
with permission from the original authors.
Original source copyright:
Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
(ASLv2) Jackson JSON processor
The following NOTICE information applies:
# Jackson JSON processor
Jackson is a high-performance, Free/Open Source JSON processing library.
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
been in development since 2007.
It is currently developed by a community of developers, as well as supported
commercially by FasterXML.com.
## Licensing
Jackson core and extension components may licensed under different licenses.
To find the details that apply to this artifact see the accompanying LICENSE file.
For more information, including possible other licensing options, contact
FasterXML.com (http://fasterxml.com).
## Credits
A list of contributors may be found from CREDITS file, which is included
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
(ASLv2) The Netty Project
The following NOTICE information applies:
The Netty Project
=================
Please visit the Netty web site for more information:
* https://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* https://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* NOTICE:
* license/NOTICE.harmony.txt
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* https://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product optionally depends on 'zstd-jni', a zstd-jni Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.zstd-jni.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/luben/zstd-jni
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* https://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jboss-remoting/jboss-marshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* https://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
* LICENSE:
* license/LICENSE.hyper-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/python-hyper/hpack/
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
* LICENSE:
* license/LICENSE.nghttp2-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/nghttp2/nghttp2/
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
* LICENSE:
* license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/takari/maven-wrapper
This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
This private header is also used by Apple's open source
mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
* LICENSE:
* license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
* HOMEPAGE:
* https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
This product optionally depends on 'Brotli4j', Brotli compression and
decompression for Java., which can be obtained at:
* LICENSE:
* license/LICENSE.brotli4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/hyperxpro/Brotli4j

View File

@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-opentelemetry-bundle</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-opentelemetry-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-transport</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.hubspot.jackson</groupId>
<artifactId>jackson-datatype-protobuf</artifactId>
<version>0.9.14</version>
<exclusions>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>1.0.0-alpha</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-web-client</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry;
import com.google.protobuf.Message;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.EventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@DefaultSchedule(period = "25 ms")
@Tags({"OpenTelemetry", "OTel", "OTLP", "telemetry", "metrics", "traces", "logs"})
@CapabilityDescription(
"Collect OpenTelemetry messages over HTTP or gRPC. " +
"Supports standard Export Service Request messages for logs, metrics, and traces. " +
"Implements OpenTelemetry OTLP Specification 1.0.0 with OTLP/gRPC and OTLP/HTTP. " +
"Provides protocol detection using the HTTP Content-Type header."
)
@WritesAttributes({
@WritesAttribute(attribute = TelemetryAttributeName.MIME_TYPE, description = "Content-Type set to application/json"),
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_TYPE, description = "OpenTelemetry Resource Type: LOGS, METRICS, or TRACES"),
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_COUNT, description = "Count of resource elements included in messages"),
})
public class ListenOTLP extends AbstractProcessor {
static final PropertyDescriptor ADDRESS = new PropertyDescriptor.Builder()
.name("Address")
.displayName("Address")
.description("Internet Protocol Address on which to listen for OTLP Export Service Requests. The default value enables listening on all addresses.")
.required(true)
.defaultValue("0.0.0.0")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.displayName("Port")
.description("TCP port number on which to listen for OTLP Export Service Requests over HTTP and gRPC")
.required(true)
.defaultValue("4317")
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.displayName("SSL Context Service")
.description("SSL Context Service enables TLS communication for HTTPS")
.required(true)
.identifiesControllerService(SSLContextService.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor CLIENT_AUTHENTICATION = new PropertyDescriptor.Builder()
.name("Client Authentication")
.displayName("Client Authentication")
.description("Client authentication policy for TLS communication with HTTPS")
.required(true)
.allowableValues(ClientAuth.values())
.defaultValue(ClientAuth.WANT.name())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder()
.name("Worker Threads")
.displayName("Worker Threads")
.description("Number of threads responsible for decoding and queuing incoming OTLP Export Service Requests")
.required(true)
.defaultValue("2")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor QUEUE_CAPACITY = new PropertyDescriptor.Builder()
.name("Queue Capacity")
.displayName("Queue Capacity")
.description("Maximum number of OTLP request resource elements that can be received and queued")
.required(true)
.defaultValue("1000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.displayName("Batch Size")
.description("Maximum number of OTLP request resource elements included in each FlowFile produced")
.required(true)
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Export Service Requests containing OTLP Telemetry")
.build();
private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(SUCCESS);
private static final List<PropertyDescriptor> DESCRIPTORS = List.of(
ADDRESS,
PORT,
SSL_CONTEXT_SERVICE,
CLIENT_AUTHENTICATION,
WORKER_THREADS,
QUEUE_CAPACITY,
BATCH_SIZE
);
private static final String TRANSIT_URI_FORMAT = "https://%s:%d";
private Iterator<RequestCallback> requestCallbackProvider;
private EventServer server;
@Override
public final Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws UnknownHostException {
final EventServerFactory eventServerFactory = createEventServerFactory(context);
server = eventServerFactory.getEventServer();
}
@OnStopped
public void onStopped() {
if (server != null) {
server.shutdown();
server = null;
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
while (requestCallbackProvider.hasNext()) {
final RequestCallback requestCallback = requestCallbackProvider.next();
processRequestCallback(session, requestCallback);
}
}
int getPort() {
return server.getListeningPort();
}
private void processRequestCallback(final ProcessSession session, final RequestCallback requestCallback) {
final String transitUri = requestCallback.getTransitUri();
FlowFile flowFile = session.create();
try {
flowFile = session.write(flowFile, requestCallback);
flowFile = session.putAllAttributes(flowFile, requestCallback.getAttributes());
session.getProvenanceReporter().receive(flowFile, transitUri);
session.transfer(flowFile, SUCCESS);
} catch (final Exception e) {
getLogger().warn("Request Transit URI [{}] processing failed {}", transitUri, flowFile, e);
session.remove(flowFile);
}
}
private EventServerFactory createEventServerFactory(final ProcessContext context) throws UnknownHostException {
final String address = context.getProperty(ADDRESS).getValue();
final InetAddress serverAddress = InetAddress.getByName(address);
final int port = context.getProperty(PORT).asInteger();
final URI transitBaseUri = URI.create(String.format(TRANSIT_URI_FORMAT, serverAddress.getCanonicalHostName(), port));
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final int queueCapacity = context.getProperty(QUEUE_CAPACITY).asInteger();
final BlockingQueue<Message> messages = new LinkedBlockingQueue<>(queueCapacity);
requestCallbackProvider = new RequestCallbackProvider(transitBaseUri, batchSize, messages);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslContextService.createContext();
final NettyEventServerFactory eventServerFactory = new HttpServerFactory(getLogger(), messages, serverAddress, port, sslContext);
eventServerFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
final int workerThreads = context.getProperty(WORKER_THREADS).asInteger();
eventServerFactory.setWorkerThreads(workerThreads);
final ClientAuth clientAuth = ClientAuth.valueOf(context.getProperty(CLIENT_AUTHENTICATION).getValue());
eventServerFactory.setClientAuth(clientAuth);
return eventServerFactory;
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.hubspot.jackson.datatype.protobuf.ProtobufJacksonConfig;
import com.hubspot.jackson.datatype.protobuf.builtin.deserializers.MessageDeserializer;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* ByteString Field Deserializer supporting conversion from hexadecimal to ByteString for selected fields
*
* @param <T> Message Type
* @param <V> Message Builder Type
*/
public class ByteStringFieldDeserializer<T extends Message, V extends Message.Builder> extends MessageDeserializer<T, V> {
private static final Set<String> HEXADECIMAL_BYTE_STRING_FIELDS = Arrays.stream(HexadecimalByteStringField.values())
.map(HexadecimalByteStringField::getField)
.collect(Collectors.toSet());
/**
* Deserializer constructor with Message Type class to be deserialized
*
* @param messageType Message Type class to be deserialized
* @param config Jackson Configuration for Protobuf
*/
public ByteStringFieldDeserializer(final Class<T> messageType, final ProtobufJacksonConfig config) {
super(messageType, config);
}
/**
* Read value from JSON Parser and decode hexadecimal ByteString fields when found
*
* @param builder Protobuf Message Builder
* @param field Message Field Descriptor
* @param defaultInstance Protobuf default instance of Message
* @param parser JSON Parser
* @param context JSON Deserialization Context for parsing
* @return Object value read
* @throws IOException Thrown on parsing failures
*/
@Override
protected Object readValue(
final Message.Builder builder,
final Descriptors.FieldDescriptor field,
final Message defaultInstance,
final JsonParser parser,
final DeserializationContext context
) throws IOException {
final String jsonName = field.getJsonName();
final Object value;
if (HEXADECIMAL_BYTE_STRING_FIELDS.contains(jsonName)) {
final String encoded = parser.getValueAsString();
if (encoded == null) {
value = null;
} else {
try {
final byte[] decoded = Hex.decodeHex(encoded);
value = ByteString.copyFrom(decoded);
} catch (DecoderException e) {
throw new IOException(String.format("Hexadecimal Field [%s] decoding failed", jsonName), e);
}
}
} else {
value = super.readValue(builder, field, defaultInstance, parser, context);
}
return value;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
/**
* Protobuf ByteString fields to be encoded as hexadecimal instead of Base64
*/
public enum HexadecimalByteStringField {
PARENT_SPAN_ID("parentSpanId"),
SPAN_ID("spanId"),
TRACE_ID("traceId");
private final String field;
HexadecimalByteStringField(final String field) {
this.field = field;
}
public String getField() {
return field;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.Message;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.metrics.v1.Exemplar;
import io.opentelemetry.proto.trace.v1.Span;
/**
* Message Types requiring hexadecimal encoding and decoding
*/
public enum HexadecimalMessageType {
SPAN(Span.class),
SPAN_LINK(Span.Link.class),
EXEMPLAR(Exemplar.class),
LOG_RECORD(LogRecord.class);
private final Class<? extends Message> messageType;
HexadecimalMessageType(final Class<? extends Message> messageType) {
this.messageType = messageType;
}
public Class<? extends Message> getMessageType() {
return messageType;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Objects;
/**
* Service Request Reader implementation based on JSON Parser supporting standard OTLP Request Types
*/
public class JsonServiceRequestReader implements ServiceRequestReader {
private static final RequestMapper REQUEST_MAPPER = new StandardRequestMapper();
/**
* Read Service Request parsed from stream
*
* @param inputStream Input Stream to be parsed
* @param requestType Request Message Type
* @return Service Request read
*/
@Override
public <T extends Message> T read(final InputStream inputStream, final Class<T> requestType) {
Objects.requireNonNull(inputStream, "Input Stream required");
try {
return REQUEST_MAPPER.readValue(inputStream, requestType);
} catch (final IOException e) {
final String message = String.format("JSON Request Type [%s] parsing failed", requestType.getName());
throw new UncheckedIOException(message, e);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.Objects;
/**
* Service Request Reader implementation based on Protobuf Parser supporting standard OTLP Request Types
*/
public class ProtobufServiceRequestReader implements ServiceRequestReader {
private final Parser<ExportLogsServiceRequest> logsServiceRequestParser = ExportLogsServiceRequest.parser();
private final Parser<ExportMetricsServiceRequest> metricsServiceRequestParser = ExportMetricsServiceRequest.parser();
private final Parser<ExportTraceServiceRequest> traceServiceRequestParser = ExportTraceServiceRequest.parser();
/**
* Read Service Request parsed from Input Stream
*
* @param inputStream Input Stream to be parsed
* @return Service Request read
*/
@Override
public <T extends Message> T read(final InputStream inputStream, final Class<T> requestType) {
Objects.requireNonNull(inputStream, "Input Stream required");
Objects.requireNonNull(requestType, "Request Type required");
try {
final Object serviceRequest;
if (ExportLogsServiceRequest.class.isAssignableFrom(requestType)) {
serviceRequest = logsServiceRequestParser.parseFrom(inputStream);
} else if (ExportMetricsServiceRequest.class.isAssignableFrom(requestType)) {
serviceRequest = metricsServiceRequestParser.parseFrom(inputStream);
} else if (ExportTraceServiceRequest.class.isAssignableFrom(requestType)) {
serviceRequest = traceServiceRequestParser.parseFrom(inputStream);
} else {
throw new IllegalArgumentException(String.format("Service Request Class [%s] not supported", requestType.getName()));
}
return requestType.cast(serviceRequest);
} catch (final InvalidProtocolBufferException e) {
throw new UncheckedIOException("Request parsing failed", e);
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
* OTLP Export Service Request Mapper for encoding and decoding objects
*/
public interface RequestMapper {
/**
* Parse bytes and return Message object of specified class
*
* @param inputStream Stream of bytes to be parsed
* @param messageClass Protobuf Message Class
* @throws IOException Thrown on deserialization failures
*/
<T extends Message> T readValue(InputStream inputStream, Class<T> messageClass) throws IOException;
/**
* Write message to specified Output Stream
*
* @param outputStream Output Stream for serialized message bytes
* @param message Protobuf Message Class to be serialized
* @throws IOException Thrown on serialization failures
*/
void writeValue(OutputStream outputStream, Message message) throws IOException;
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
/**
* Response Body Writer abstraction for serializing response status information
*/
public interface ResponseBodyWriter {
/**
* Get Response Body as serialized byte array according
* @param serviceRequestDescription Service Request Description
* @param serviceResponse Service Response
* @return Serialized byte array
*/
byte[] getResponseBody(ServiceRequestDescription serviceRequestDescription, ServiceResponse serviceResponse);
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.Message;
import java.io.InputStream;
/**
* OTLP Export Service Request Reader abstraction for parsing objects from streams
*/
public interface ServiceRequestReader {
/**
* Read Service Request from Input Stream
*
* @param inputStream Input Stream of bytes to be parsed
* @param requestType Request Message Type
* @return Service Request read
*/
<T extends Message> T read(InputStream inputStream, Class<T> requestType);
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.protobuf.Message;
import com.hubspot.jackson.datatype.protobuf.MessageDeserializerFactory;
import com.hubspot.jackson.datatype.protobuf.ProtobufJacksonConfig;
/**
* Standard extension of Deserializer Factory supporting hexadecimal decoding to ByteString
*/
public class StandardMessageDeserializerFactory extends MessageDeserializerFactory {
private final ProtobufJacksonConfig protobufJacksonConfig;
public StandardMessageDeserializerFactory(final ProtobufJacksonConfig protobufJacksonConfig) {
super(protobufJacksonConfig);
this.protobufJacksonConfig = protobufJacksonConfig;
}
@Override
@SuppressWarnings("unchecked")
public JsonDeserializer<?> findBeanDeserializer(final JavaType type, final DeserializationConfig config, final BeanDescription beanDescription) throws JsonMappingException {
final Class<?> rawClass = type.getRawClass();
if (isHexadecimalMessageType(rawClass)) {
final Class<? extends Message> messageClass = (Class<? extends Message>) rawClass;
return new ByteStringFieldDeserializer<>(messageClass, protobufJacksonConfig).buildAtEnd();
} else {
return super.findBeanDeserializer(type, config, beanDescription);
}
}
private boolean isHexadecimalMessageType(final Class<?> rawClass) {
boolean found = false;
for (final HexadecimalMessageType hexadecimalMessageType : HexadecimalMessageType.values()) {
final Class<? extends Message> messageType = hexadecimalMessageType.getMessageType();
if (messageType.isAssignableFrom(rawClass)) {
found = true;
break;
}
}
return found;
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.databind.module.SimpleSerializers;
import com.google.protobuf.Message;
import com.hubspot.jackson.datatype.protobuf.ProtobufJacksonConfig;
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
/**
* Standard extension of Protobuf Jackson Module supporting OTLP JSON Protobuf Encoding specifications
*/
public class StandardProtobufModule extends ProtobufModule {
private static final ProtobufJacksonConfig protobufJacksonConfig = ProtobufJacksonConfig.builder()
.properUnsignedNumberSerialization(true)
.build();
public StandardProtobufModule() {
super(protobufJacksonConfig);
}
@Override
public void setupModule(SetupContext context) {
super.setupModule(context);
final TelemetryMessageSerializer telemetryMessageSerializer = new TelemetryMessageSerializer(protobufJacksonConfig);
final SimpleSerializers serializers = new SimpleSerializers();
for (final HexadecimalMessageType hexadecimalMessageType : HexadecimalMessageType.values()) {
final Class<? extends Message> messageType = hexadecimalMessageType.getMessageType();
serializers.addSerializer(messageType, telemetryMessageSerializer);
}
context.addSerializers(serializers);
final StandardMessageDeserializerFactory deserializerFactory = new StandardMessageDeserializerFactory(protobufJacksonConfig);
context.addDeserializers(deserializerFactory);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
/**
* Standard Request Mapper supporting Protobuf to JSON conversion following OTLP 1.0.0 conventions
*/
public class StandardRequestMapper implements RequestMapper {
private final ObjectMapper objectMapper;
/**
* Standard Request Mapper constructor configures Jackson ObjectMapper with Protobuf Module and OTLP serializers
*/
public StandardRequestMapper() {
objectMapper = new ObjectMapper();
// OTLP 1.0.0 requires enumerated values to be serialized as integers
objectMapper.enable(SerializationFeature.WRITE_ENUMS_USING_INDEX);
final StandardProtobufModule protobufModule = new StandardProtobufModule();
objectMapper.registerModule(protobufModule);
}
@Override
public <T extends Message> T readValue(final InputStream inputStream, final Class<T> messageClass) throws IOException {
Objects.requireNonNull(inputStream, "Input Stream required");
Objects.requireNonNull(messageClass, "Message Class required");
return objectMapper.readValue(inputStream, messageClass);
}
@Override
public void writeValue(final OutputStream outputStream, final Message message) throws IOException {
Objects.requireNonNull(outputStream, "Output Stream required");
Objects.requireNonNull(message, "Message required");
objectMapper.writeValue(outputStream, message);
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.ExportTracePartialSuccess;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponseStatus;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Objects;
/**
* Standard implementation of Response Body Writer support Partial Success handling for rejected records
*/
public class StandardResponseBodyWriter implements ResponseBodyWriter {
private static final String CAPACITY_ERROR_MESSAGE = "Queue capacity reached";
private static final byte[] EMPTY_PROTOBUF_BODY = new byte[0];
private static final byte[] EMPTY_JSON_OBJECT_BODY = new byte[]{123, 125};
private static final RequestMapper REQUEST_MAPPER = new StandardRequestMapper();
@Override
public byte[] getResponseBody(final ServiceRequestDescription serviceRequestDescription, final ServiceResponse serviceResponse) {
Objects.requireNonNull(serviceRequestDescription, "Request Description required");
Objects.requireNonNull(serviceResponse, "Response required");
final byte[] responseBody;
final ServiceResponseStatus serviceResponseStatus = serviceResponse.getServiceResponseStatus();
if (ServiceResponseStatus.PARTIAL_SUCCESS == serviceResponseStatus) {
responseBody = getPartialSuccessResponseBody(serviceRequestDescription, serviceResponse);
} else {
final TelemetryContentType contentType = serviceRequestDescription.getContentType();
if (TelemetryContentType.APPLICATION_JSON == contentType) {
responseBody = EMPTY_JSON_OBJECT_BODY;
} else {
responseBody = EMPTY_PROTOBUF_BODY;
}
}
return responseBody;
}
private byte[] getPartialSuccessResponseBody(final ServiceRequestDescription serviceRequestDescription, final ServiceResponse serviceResponse) {
final Message message;
final int rejected = serviceResponse.getRejected();
final TelemetryRequestType requestType = serviceRequestDescription.getRequestType();
if (TelemetryRequestType.LOGS == requestType) {
final ExportLogsPartialSuccess partialSuccess = ExportLogsPartialSuccess.newBuilder()
.setRejectedLogRecords(rejected)
.setErrorMessage(CAPACITY_ERROR_MESSAGE)
.build();
message = ExportLogsServiceResponse.newBuilder()
.setPartialSuccess(partialSuccess)
.build();
} else if (TelemetryRequestType.METRICS == requestType) {
final ExportMetricsPartialSuccess partialSuccess = ExportMetricsPartialSuccess.newBuilder()
.setRejectedDataPoints(rejected)
.setErrorMessage(CAPACITY_ERROR_MESSAGE)
.build();
message = ExportMetricsServiceResponse.newBuilder()
.setPartialSuccess(partialSuccess)
.build();
} else if (TelemetryRequestType.TRACES == requestType) {
final ExportTracePartialSuccess partialSuccess = ExportTracePartialSuccess.newBuilder()
.setRejectedSpans(rejected)
.setErrorMessage(CAPACITY_ERROR_MESSAGE)
.build();
message = ExportTraceServiceResponse.newBuilder()
.setPartialSuccess(partialSuccess)
.build();
} else {
throw new IllegalArgumentException(String.format("Service Request Type [%s] not supported", requestType));
}
final TelemetryContentType contentType = serviceRequestDescription.getContentType();
final byte[] responseBody;
if (TelemetryContentType.APPLICATION_JSON == contentType) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
REQUEST_MAPPER.writeValue(outputStream, message);
responseBody = outputStream.toByteArray();
} catch (final IOException e) {
final String error = String.format("JSON Response Type [%s] serialization failed", message.getClass().getName());
throw new UncheckedIOException(error, e);
}
} else {
responseBody = message.toByteArray();
}
return responseBody;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.hubspot.jackson.datatype.protobuf.ProtobufJacksonConfig;
import com.hubspot.jackson.datatype.protobuf.builtin.serializers.MessageSerializer;
import org.apache.commons.codec.binary.Hex;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* OpenTelemetry extension of Protobuf Message Serializer supporting OTLP 1.0.0 customization of selected fields
*/
public class TelemetryMessageSerializer extends MessageSerializer {
private static final Set<String> HEXADECIMAL_BYTE_STRING_FIELDS = Arrays.stream(HexadecimalByteStringField.values())
.map(HexadecimalByteStringField::getField)
.collect(Collectors.toSet());
protected TelemetryMessageSerializer(final ProtobufJacksonConfig config) {
super(config);
}
/**
* Write value as JSON with hexadecimal encoding for selected ByteString fields
*
* @param field Message Field Descriptor
* @param value Value to be serialized
* @param generator JSON Generator
* @param serializerProvider JSON Serializer Provider
* @throws IOException Thrown on failures serializing values
*/
@Override
protected void writeValue(
Descriptors.FieldDescriptor field,
Object value,
JsonGenerator generator,
SerializerProvider serializerProvider
) throws IOException {
final String jsonName = field.getJsonName();
if (HEXADECIMAL_BYTE_STRING_FIELDS.contains(jsonName)) {
final ByteString byteString = (ByteString) value;
final String encoded = getEncodedByteString(byteString);
generator.writeString(encoded);
} else {
super.writeValue(field, value, generator, serializerProvider);
}
}
private String getEncodedByteString(final ByteString byteString) {
final ByteBuffer buffer = byteString.asReadOnlyByteBuffer();
return Hex.encodeHexString(buffer, false);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.io;
import org.apache.nifi.processor.io.OutputStreamCallback;
import java.util.Map;
/**
* Request Output Stream Callback supporting additional methods for tracking
*/
public interface RequestCallback extends OutputStreamCallback {
/**
* Get Transit URI for Provenance Reporting
*
* @return Transit URI
*/
String getTransitUri();
/**
* Get FlowFile attributes
*
* @return FlowFile attributes
*/
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.io;
import com.google.protobuf.Message;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
/**
* Request Callback Provider creates Callback instances for batches of Request messages
*/
public class RequestCallbackProvider implements Iterator<RequestCallback> {
private static final String EMPTY_ELEMENT = null;
private final URI transitBaseUri;
private final int batchSize;
private final BlockingQueue<Message> messages;
/**
* Request Callback Provider constructor with required parameters for queued messages
*
* @param transitBaseUri Transit Base URI to which the provider appends the Request Type path
* @param batchSize Maximum batch size for messages included in a single Request Callback
* @param messages Queue of messages to be processed
*/
public RequestCallbackProvider(final URI transitBaseUri, final int batchSize, final BlockingQueue<Message> messages) {
this.transitBaseUri = Objects.requireNonNull(transitBaseUri, "Transit Base URI required");
this.batchSize = batchSize;
this.messages = Objects.requireNonNull(messages, "Messages required");
}
/**
* Provider has next returns status based on queued messages
*
* @return Next callback available based on queued messages
*/
@Override
public boolean hasNext() {
return !messages.isEmpty();
}
/**
* Provider returns next Request Callback instance for handling a batch of Request messages
*
* @return Request Callback
* @throws NoSuchElementException Thrown when no messages queued
*/
@Override
public RequestCallback next() {
final Message head = messages.element();
final Class<? extends Message> headMessageClass = head.getClass();
final TelemetryRequestType requestType = getRequestType(headMessageClass);
final String transitUri = getTransitUri(requestType);
final List<Message> requestMessages = getRequestMessages(headMessageClass);
return new StandardRequestCallback(requestType, headMessageClass, requestMessages, transitUri);
}
private TelemetryRequestType getRequestType(final Class<?> headMessageClass) {
final TelemetryRequestType requestType;
if (ResourceLogs.class.isAssignableFrom(headMessageClass)) {
requestType = TelemetryRequestType.LOGS;
} else if (ResourceMetrics.class.isAssignableFrom(headMessageClass)) {
requestType = TelemetryRequestType.METRICS;
} else if (ResourceSpans.class.isAssignableFrom(headMessageClass)) {
requestType = TelemetryRequestType.TRACES;
} else {
throw new IllegalArgumentException(String.format("Request Class [%s] not supported", headMessageClass));
}
return requestType;
}
private String getTransitUri(final TelemetryRequestType requestType) {
try {
final URI uri = new URI(
transitBaseUri.getScheme(),
EMPTY_ELEMENT,
transitBaseUri.getHost(),
transitBaseUri.getPort(),
requestType.getPath(),
EMPTY_ELEMENT,
EMPTY_ELEMENT
);
return uri.toString();
} catch (final URISyntaxException e) {
throw new RuntimeException(e);
}
}
private List<Message> getRequestMessages(final Class<?> headMessageClass) {
final List<Message> requestMessages = new ArrayList<>(batchSize);
final Iterator<Message> currentMessages = messages.iterator();
while (currentMessages.hasNext()) {
final Message message = currentMessages.next();
final Class<?> messageClass = message.getClass();
if (headMessageClass.equals(messageClass)) {
requestMessages.add(message);
currentMessages.remove();
if (requestMessages.size() == batchSize) {
break;
}
}
}
return requestMessages;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.io;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import java.nio.ByteBuffer;
/**
* Request Content Listener abstracts deserialization processing for Export Service Requests
*/
public interface RequestContentListener {
/**
* On Request handles buffer deserialization and returns request status
*
* @param buffer Request Content Buffer to be processed
* @param serviceRequestDescription Service Request Description describes reader attributes
* @return Service Response indicates processing results
*/
ServiceResponse onRequest(ByteBuffer buffer, ServiceRequestDescription serviceRequestDescription);
}

View File

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.io;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import org.apache.nifi.processors.opentelemetry.encoding.RequestMapper;
import org.apache.nifi.processors.opentelemetry.encoding.StandardRequestMapper;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Standard Request Callback implementation serializes messages to JSON according to OTLP 1.0.0 formatting
*/
class StandardRequestCallback implements RequestCallback {
private static final String APPLICATION_JSON = "application/json";
private static final RequestMapper REQUEST_MAPPER = new StandardRequestMapper();
private final Map<String, String> attributes = new LinkedHashMap<>();
private final Class<? extends Message> messageClass;
private final List<Message> messages;
private final String transitUri;
StandardRequestCallback(final TelemetryRequestType requestType, final Class<? extends Message> messageClass, final List<Message> messages, final String transitUri) {
this.messageClass = Objects.requireNonNull(messageClass, "Message Class required");
this.messages = Objects.requireNonNull(messages, "Messages required");
this.transitUri = Objects.requireNonNull(transitUri, "Transit URI required");
attributes.put(TelemetryAttributeName.MIME_TYPE, APPLICATION_JSON);
attributes.put(TelemetryAttributeName.RESOURCE_TYPE, requestType.name());
attributes.put(TelemetryAttributeName.RESOURCE_COUNT, Integer.toString(messages.size()));
}
/**
* Process messages serialized as JSON wrapped in Export Service Request class
*
* @param outputStream FlowFile OutputStream for writing serialized JSON
* @throws IOException Thrown on failures writing to OutputStream
*/
@Override
public void process(final OutputStream outputStream) throws IOException {
if (ResourceLogs.class.isAssignableFrom(messageClass)) {
final ExportLogsServiceRequest.Builder requestBuilder = ExportLogsServiceRequest.newBuilder();
for (final Message message : messages) {
final ResourceLogs resourceLogs = (ResourceLogs) message;
requestBuilder.addResourceLogs(resourceLogs);
}
final ExportLogsServiceRequest request = requestBuilder.build();
REQUEST_MAPPER.writeValue(outputStream, request);
} else if (ResourceMetrics.class.isAssignableFrom(messageClass)) {
final ExportMetricsServiceRequest.Builder requestBuilder = ExportMetricsServiceRequest.newBuilder();
for (final Message message : messages) {
final ResourceMetrics currentResourceMetrics = (ResourceMetrics) message;
requestBuilder.addResourceMetrics(currentResourceMetrics);
}
final ExportMetricsServiceRequest request = requestBuilder.build();
REQUEST_MAPPER.writeValue(outputStream, request);
} else if (ResourceSpans.class.isAssignableFrom(messageClass)) {
final ExportTraceServiceRequest.Builder requestBuilder = ExportTraceServiceRequest.newBuilder();
for (final Message message : messages) {
final ResourceSpans resourceSpans = (ResourceSpans) message;
requestBuilder.addResourceSpans(resourceSpans);
}
final ExportTraceServiceRequest request = requestBuilder.build();
REQUEST_MAPPER.writeValue(outputStream, request);
} else {
throw new IllegalArgumentException(String.format("Request Class [%s] not supported", messageClass));
}
}
/**
* Get Transit URI for Provenance Reporting
*
* @return Transit URI
*/
@Override
public String getTransitUri() {
return transitUri;
}
/**
* Get FlowFile attributes based on Request Type and messages queued
*
* @return FlowFile attributes
*/
@Override
public Map<String, String> getAttributes() {
return Collections.unmodifiableMap(attributes);
}
}

View File

@ -0,0 +1,254 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.io;
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.opentelemetry.encoding.JsonServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ProtobufServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.encoding.ServiceRequestReader;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponseStatus;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentEncoding;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;
/**
* Standard implementation of OTLP Request Content Listener supporting available Request and Content Types
*/
public class StandardRequestContentListener implements RequestContentListener {
private static final int ZERO_MESSAGES = 0;
private static final byte COMPRESSED = 1;
private static final String CLIENT_SOCKET_ADDRESS = "client.socket.address";
private static final String CLIENT_SOCKET_PORT = "client.socket.port";
private final ServiceRequestReader protobufReader = new ProtobufServiceRequestReader();
private final ServiceRequestReader jsonReader = new JsonServiceRequestReader();
private final ComponentLog log;
private final BlockingQueue<Message> messages;
public StandardRequestContentListener(final ComponentLog log, final BlockingQueue<Message> messages) {
this.log = Objects.requireNonNull(log, "Log required");
this.messages = Objects.requireNonNull(messages, "Messages required");
}
/**
* Process Request Buffer supporting serialized Protobuf or JSON
*
* @param buffer Request Content Buffer to be processed
* @param serviceRequestDescription Service Request Description describes reader attributes
* @return Service Response with processing status
*/
@Override
public ServiceResponse onRequest(final ByteBuffer buffer, final ServiceRequestDescription serviceRequestDescription) {
Objects.requireNonNull(buffer, "Buffer required");
Objects.requireNonNull(serviceRequestDescription, "Description required");
ServiceResponse serviceResponse;
final InetSocketAddress remoteAddress = serviceRequestDescription.getRemoteAddress();
final TelemetryContentType contentType = serviceRequestDescription.getContentType();
if (TelemetryContentType.APPLICATION_GRPC == contentType) {
try {
final byte compression = buffer.get();
final int messageSize = buffer.getInt();
log.debug("Client Address [{}] Content-Type [{}] Message Size [{}] Compression [{}]", remoteAddress, contentType, messageSize, compression);
final TelemetryContentEncoding bufferEncoding;
if (COMPRESSED == compression) {
bufferEncoding = TelemetryContentEncoding.GZIP;
} else {
bufferEncoding = TelemetryContentEncoding.NONE;
}
final InputStream decodedStream = getDecodedStream(buffer, bufferEncoding);
serviceResponse = onSupportedRequest(decodedStream, serviceRequestDescription);
} catch (final Exception e) {
log.warn("Client Address [{}] Content-Type [{}] processing failed", remoteAddress, contentType, e);
serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
}
} else if (TelemetryContentType.APPLICATION_PROTOBUF == contentType || TelemetryContentType.APPLICATION_JSON == contentType) {
try {
final InputStream decodedStream = getDecodedStream(buffer, serviceRequestDescription.getContentEncoding());
serviceResponse = onSupportedRequest(decodedStream, serviceRequestDescription);
} catch (final Exception e) {
log.warn("Client Address [{}] Content-Type [{}] processing failed", remoteAddress, contentType, e);
serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
}
} else {
serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
}
return serviceResponse;
}
private ServiceResponse onSupportedRequest(final InputStream inputStream, final ServiceRequestDescription serviceRequestDescription) throws IOException {
final ServiceRequestReader serviceRequestReader;
final TelemetryContentType contentType = serviceRequestDescription.getContentType();
if (TelemetryContentType.APPLICATION_JSON == contentType) {
serviceRequestReader = jsonReader;
} else {
serviceRequestReader = protobufReader;
}
final TelemetryRequestType requestType = serviceRequestDescription.getRequestType();
final List<? extends Message> resourceMessages;
if (inputStream.available() == 0) {
resourceMessages = Collections.emptyList();
} else if (TelemetryRequestType.LOGS == requestType) {
resourceMessages = readMessages(inputStream, serviceRequestDescription, ExportLogsServiceRequest.class, serviceRequestReader);
} else if (TelemetryRequestType.METRICS == requestType) {
resourceMessages = readMessages(inputStream, serviceRequestDescription, ExportMetricsServiceRequest.class, serviceRequestReader);
} else if (TelemetryRequestType.TRACES == requestType) {
resourceMessages = readMessages(inputStream, serviceRequestDescription, ExportTraceServiceRequest.class, serviceRequestReader);
} else {
resourceMessages = null;
}
return onMessages(resourceMessages);
}
private <T extends Message> List<Message> readMessages(
final InputStream inputStream,
final ServiceRequestDescription serviceRequestDescription,
final Class<T> requestType,
final ServiceRequestReader serviceRequestReader
) {
final List<Message> messages = new ArrayList<>();
final List<KeyValue> clientSocketAttributes = getClientSocketAttributes(serviceRequestDescription);
final T parsed = serviceRequestReader.read(inputStream, requestType);
if (parsed instanceof ExportLogsServiceRequest request) {
for (final ResourceLogs resourceLogs : request.getResourceLogsList()) {
final Resource.Builder resource = resourceLogs.getResource().toBuilder();
resource.addAllAttributes(clientSocketAttributes);
final Message message = resourceLogs.toBuilder().setResource(resource).build();
messages.add(message);
}
} else if (parsed instanceof ExportMetricsServiceRequest request) {
for (final ResourceMetrics resourceMetrics : request.getResourceMetricsList()) {
final Resource.Builder resource = resourceMetrics.getResource().toBuilder();
resource.addAllAttributes(clientSocketAttributes);
final Message message = resourceMetrics.toBuilder().setResource(resource).build();
messages.add(message);
}
} else if (parsed instanceof ExportTraceServiceRequest request) {
for (final ResourceSpans resourceSpans : request.getResourceSpansList()) {
final Resource.Builder resource = resourceSpans.getResource().toBuilder();
resource.addAllAttributes(clientSocketAttributes);
final Message message = resourceSpans.toBuilder().setResource(resource).build();
messages.add(message);
}
} else {
throw new IllegalArgumentException(String.format("Request Type [%s] not supported", requestType.getName()));
}
return messages;
}
private List<KeyValue> getClientSocketAttributes(final ServiceRequestDescription serviceRequestDescription) {
final InetSocketAddress remoteAddress = serviceRequestDescription.getRemoteAddress();
final InetAddress remoteSocketAddress = remoteAddress.getAddress();
final String socketAddress = remoteSocketAddress.getHostAddress();
final int socketPort = remoteAddress.getPort();
final KeyValue clientSocketAddress = KeyValue.newBuilder()
.setKey(CLIENT_SOCKET_ADDRESS)
.setValue(AnyValue.newBuilder().setStringValue(socketAddress))
.build();
final KeyValue clientSocketPort = KeyValue.newBuilder()
.setKey(CLIENT_SOCKET_PORT)
.setValue(AnyValue.newBuilder().setIntValue(socketPort))
.build();
return List.of(clientSocketAddress, clientSocketPort);
}
private ServiceResponse onMessages(final List<? extends Message> resourceMessages) {
final ServiceResponse serviceResponse;
if (resourceMessages == null) {
serviceResponse = new ServiceResponse(ServiceResponseStatus.INVALID, ZERO_MESSAGES);
} else if (resourceMessages.isEmpty()) {
serviceResponse = new ServiceResponse(ServiceResponseStatus.SUCCESS, ZERO_MESSAGES);
} else {
int accepted = 0;
for (final Message resourceMessage : resourceMessages) {
if (messages.offer(resourceMessage)) {
accepted++;
}
}
final int rejected = resourceMessages.size() - accepted;
if (ZERO_MESSAGES == rejected) {
serviceResponse = new ServiceResponse(ServiceResponseStatus.SUCCESS, ZERO_MESSAGES);
} else if (ZERO_MESSAGES == accepted) {
serviceResponse = new ServiceResponse(ServiceResponseStatus.UNAVAILABLE, ZERO_MESSAGES);
} else {
serviceResponse = new ServiceResponse(ServiceResponseStatus.PARTIAL_SUCCESS, rejected);
}
}
return serviceResponse;
}
private InputStream getDecodedStream(final ByteBuffer buffer, final TelemetryContentEncoding contentEncoding) throws IOException {
final InputStream decodedStream;
if (TelemetryContentEncoding.GZIP == contentEncoding) {
decodedStream = new GZIPInputStream(new ByteBufferBackedInputStream(buffer));
} else {
decodedStream = new ByteBufferBackedInputStream(buffer);
}
return decodedStream;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Enumeration of supported gRPC HTTP Headers
*/
public enum GrpcHeader {
GRPC_ENCODING("grpc-encoding"),
GRPC_MESSAGE("grpc-message"),
GRPC_STATUS("grpc-status");
private final String header;
GrpcHeader(final String header) {
this.header = header;
}
/**
* Get gRPC Header Name for HTTP messages
*
* @return gRPC Header Name
*/
public String getHeader() {
return header;
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Enumeration of supported gRPC Status Codes
*/
public enum GrpcStatusCode {
OK(0),
CANCELLED(1),
UNKNOWN(2),
INVALID_ARGUMENT(3),
UNAVAILABLE(14);
private final int code;
GrpcStatusCode(final int code) {
this.code = code;
}
public int getCode() {
return code;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
import java.net.InetSocketAddress;
/**
* Service Request Description from Headers
*/
public interface ServiceRequestDescription {
/**
* Get Request Content-Encoding
*
* @return Request Content-Encoding indicating compression
*/
TelemetryContentEncoding getContentEncoding();
/**
* Get Request Content-Type
*
* @return Request Content-Type
*/
TelemetryContentType getContentType();
/**
* Get Export Service Request Type
*
* @return Export Service Request Type
*/
TelemetryRequestType getRequestType();
/**
* Get Remote Address
*
* @return Remote Address
*/
InetSocketAddress getRemoteAddress();
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
import java.util.Objects;
/**
* Service Processing Response with status and number of rejected elements
*/
public class ServiceResponse {
private final ServiceResponseStatus serviceResponseStatus;
private final int rejected;
public ServiceResponse(final ServiceResponseStatus serviceResponseStatus, final int rejected) {
this.serviceResponseStatus = Objects.requireNonNull(serviceResponseStatus, "Status required");
this.rejected = rejected;
}
/**
* Get Service Response Status including HTTP Status
*
* @return Service Response Status
*/
public ServiceResponseStatus getServiceResponseStatus() {
return serviceResponseStatus;
}
/**
* Get rejected resource elements
*
* @return Rejected resource elements
*/
public int getRejected() {
return rejected;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Service Response Processing Status with HTTP Status Code
*/
public enum ServiceResponseStatus {
SUCCESS(200),
PARTIAL_SUCCESS(200),
UNAVAILABLE(503),
INVALID(400);
private final int statusCode;
ServiceResponseStatus(final int statusCode) {
this.statusCode = statusCode;
}
public int getStatusCode() {
return statusCode;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* Standard implementation of Service Request Description
*/
public class StandardServiceRequestDescription implements ServiceRequestDescription {
private final TelemetryContentEncoding contentEncoding;
private final TelemetryContentType contentType;
private final TelemetryRequestType requestType;
private final InetSocketAddress remoteAddress;
public StandardServiceRequestDescription(
final TelemetryContentEncoding contentEncoding,
final TelemetryContentType contentType,
final TelemetryRequestType requestType,
final InetSocketAddress remoteAddress
) {
this.contentEncoding = Objects.requireNonNull(contentEncoding, "Content Encoding required");
this.contentType = Objects.requireNonNull(contentType, "Content Type required");
this.requestType = Objects.requireNonNull(requestType, "Request Type required");
this.remoteAddress = Objects.requireNonNull(remoteAddress, "Remote Address required");
}
@Override
public TelemetryContentEncoding getContentEncoding() {
return contentEncoding;
}
@Override
public TelemetryContentType getContentType() {
return contentType;
}
@Override
public TelemetryRequestType getRequestType() {
return requestType;
}
@Override
public InetSocketAddress getRemoteAddress() {
return remoteAddress;
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* OpenTelemetry FlowFile Attribute Names
*/
public interface TelemetryAttributeName {
/** Count of resource elements */
String RESOURCE_COUNT = "resource.count";
/** Type of resource */
String RESOURCE_TYPE = "resource.type";
/** MIME Content Type */
String MIME_TYPE = "mime.type";
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Enumeration of supported OpenTelemetry OTLP 1.0.0 HTTP Content Encodings
*/
public enum TelemetryContentEncoding {
GZIP("gzip"),
NONE("none");
private final String contentEncoding;
TelemetryContentEncoding(final String contentEncoding) {
this.contentEncoding = contentEncoding;
}
public String getContentEncoding() {
return contentEncoding;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Enumeration of supported OpenTelemetry OTLP 1.0.0 HTTP Content Types
*/
public enum TelemetryContentType {
APPLICATION_GRPC("application/grpc"),
APPLICATION_JSON("application/json"),
APPLICATION_PROTOBUF("application/x-protobuf");
private final String contentType;
TelemetryContentType(final String contentType) {
this.contentType = contentType;
}
public String getContentType() {
return contentType;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.protocol;
/**
* Enumeration of supported OpenTelemetry OTLP 1.0.0 Telemetry Types
*/
public enum TelemetryRequestType {
LOGS("/v1/logs", "/opentelemetry.proto.collector.logs.v1.LogsService/Export"),
METRICS("/v1/metrics", "/opentelemetry.proto.collector.metrics.v1.MetricsService/Export"),
TRACES("/v1/traces", "/opentelemetry.proto.collector.trace.v1.TraceService/Export");
private final String path;
private final String grpcPath;
TelemetryRequestType(final String path, final String grpcPath) {
this.path = path;
this.grpcPath = grpcPath;
}
/**
* Get URI path for REST requests
*
* @return URI Pat
*/
public String getPath() {
return path;
}
/**
* Get URI path for gRPC requests
*
* @return URI path for gRPC requests
*/
public String getGrpcPath() {
return grpcPath;
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.server;
import com.google.protobuf.Message;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapter;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.opentelemetry.io.RequestContentListener;
import org.apache.nifi.processors.opentelemetry.io.StandardRequestContentListener;
import javax.net.ssl.SSLEngine;
import java.util.concurrent.BlockingQueue;
/**
* HTTP Protocol Negotiation Handler configures Channel Pipeline based on HTTP/2 or HTTP/1.1
*/
public class HttpProtocolNegotiationHandler extends ApplicationProtocolNegotiationHandler {
/** Set HTTP/1.1 as the default Application Protocol when not provided during TLS negotiation */
private static final String DEFAULT_APPLICATION_PROTOCOL = ApplicationProtocolNames.HTTP_1_1;
/** Set maximum input content length to 10 MB */
private static final int DEFAULT_MAXIMUM_CONTENT_LENGTH = 10485760;
private static final boolean SERVER_CONNECTION = true;
private final ComponentLog log;
private final BlockingQueue<Message> messages;
/**
* HTTP Protocol Negotiation Handler defaults to HTTP/1.1 when clients do not indicate supported Application Protocols
*
* @param log Component Log
* @param messages Queue of Telemetry Messages
*/
public HttpProtocolNegotiationHandler(final ComponentLog log, final BlockingQueue<Message> messages) {
super(DEFAULT_APPLICATION_PROTOCOL);
this.log = log;
this.messages = messages;
}
/**
* Configure Channel Pipeline based on negotiated Application Layer Protocol
*
* @param channelHandlerContext Channel Handler Context
* @param protocol Negotiated Protocol ignored in favor of SSLEngine.getApplicationProtocol()
*/
@Override
protected void configurePipeline(final ChannelHandlerContext channelHandlerContext, final String protocol) {
final ChannelPipeline pipeline = channelHandlerContext.pipeline();
final RequestContentListener requestContentListener = new StandardRequestContentListener(log, messages);
final String applicationProtocol = getApplicationProtocol(channelHandlerContext);
final HttpRequestHandler httpRequestHandler = new HttpRequestHandler(log, requestContentListener);
if (ApplicationProtocolNames.HTTP_2.equals(applicationProtocol)) {
// Build Connection Handler for HTTP/2 processing as FullHttpRequest objects for HttpRequestHandler
final DefaultHttp2Connection connection = new DefaultHttp2Connection(SERVER_CONNECTION);
final InboundHttp2ToHttpAdapter frameListener = new InboundHttp2ToHttpAdapterBuilder(connection)
.propagateSettings(true)
.validateHttpHeaders(false)
.maxContentLength(DEFAULT_MAXIMUM_CONTENT_LENGTH)
.build();
final HttpToHttp2ConnectionHandler connectionHandler = new HttpToHttp2ConnectionHandlerBuilder()
.frameListener(frameListener)
.connection(connection)
.build();
pipeline.addLast(connectionHandler, httpRequestHandler);
} else if (ApplicationProtocolNames.HTTP_1_1.equals(applicationProtocol)) {
pipeline.addLast(
new HttpServerCodec(),
new HttpContentCompressor(),
new HttpServerExpectContinueHandler(),
new HttpObjectAggregator(DEFAULT_MAXIMUM_CONTENT_LENGTH),
httpRequestHandler
);
} else {
throw new IllegalStateException(String.format("Application Protocol [%s] not supported", applicationProtocol));
}
}
/**
* Get Application Protocol for SSLEngine because Netty SslHandler does not handle standard SSLEngine methods
*
* @param channelHandlerContext Channel Handler Context containing SslHandler
* @return Application Protocol defaults to HTTP/1.1 when not provided
*/
private String getApplicationProtocol(final ChannelHandlerContext channelHandlerContext) {
final SslHandler sslHandler = channelHandlerContext.pipeline().get(SslHandler.class);
final SSLEngine sslEngine = sslHandler.engine();
final String negotiatedApplicationProtocol = sslEngine.getApplicationProtocol();
final String applicationProtocol;
if (negotiatedApplicationProtocol == null || negotiatedApplicationProtocol.isEmpty()) {
applicationProtocol = DEFAULT_APPLICATION_PROTOCOL;
} else {
applicationProtocol = negotiatedApplicationProtocol;
}
return applicationProtocol;
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.HttpConversionUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.opentelemetry.encoding.ResponseBodyWriter;
import org.apache.nifi.processors.opentelemetry.encoding.StandardResponseBodyWriter;
import org.apache.nifi.processors.opentelemetry.io.RequestContentListener;
import org.apache.nifi.processors.opentelemetry.protocol.GrpcHeader;
import org.apache.nifi.processors.opentelemetry.protocol.GrpcStatusCode;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponse;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceResponseStatus;
import org.apache.nifi.processors.opentelemetry.protocol.ServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.StandardServiceRequestDescription;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentEncoding;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import org.apache.nifi.util.StringUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Objects;
/**
* HTTP Handler for OTLP Export Service Requests over gGRPC or encoded as JSON or Protobuf over HTTP
*/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final ResponseBodyWriter responseBodyWriter = new StandardResponseBodyWriter();
private final ComponentLog log;
private final RequestContentListener requestContentListener;
/**
* HTTP Request Handler constructor with Component Log associated with Processor
*
* @param log Component Log
* @param requestContentListener Service Request Listener
*/
public HttpRequestHandler(final ComponentLog log, final RequestContentListener requestContentListener) {
this.log = Objects.requireNonNull(log, "Component Log required");
this.requestContentListener = Objects.requireNonNull(requestContentListener, "Listener required");
}
/**
* Channel Read handles Full HTTP Request objects
*
* @param channelHandlerContext Channel Handler Context
* @param httpRequest Full HTTP Request to be processed
*/
@Override
protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final FullHttpRequest httpRequest) {
if (HttpMethod.POST == httpRequest.method()) {
handleHttpPostRequest(channelHandlerContext, httpRequest);
} else {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.METHOD_NOT_ALLOWED);
}
}
private void handleHttpPostRequest(final ChannelHandlerContext channelHandlerContext, final FullHttpRequest httpRequest) {
final HttpHeaders headers = httpRequest.headers();
final String requestContentType = headers.get(HttpHeaderNames.CONTENT_TYPE);
final TelemetryContentType telemetryContentType = getTelemetryContentType(requestContentType);
if (telemetryContentType == null) {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
} else {
final String uri = httpRequest.uri();
final TelemetryRequestType telemetryRequestType = getTelemetryRequestType(uri, telemetryContentType);
if (telemetryRequestType == null) {
sendCloseResponse(channelHandlerContext, httpRequest, HttpResponseStatus.NOT_FOUND);
} else {
handleHttpPostRequestTypeSupported(channelHandlerContext, httpRequest, telemetryRequestType, telemetryContentType);
}
}
}
private void handleHttpPostRequestTypeSupported(
final ChannelHandlerContext channelHandlerContext,
final FullHttpRequest httpRequest,
final TelemetryRequestType telemetryRequestType,
final TelemetryContentType telemetryContentType
) {
final HttpHeaders headers = httpRequest.headers();
final String requestContentEncoding = headers.get(HttpHeaderNames.CONTENT_ENCODING);
final TelemetryContentEncoding telemetryContentEncoding = getTelemetryContentEncoding(requestContentEncoding);
final InetSocketAddress remoteAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
final ServiceRequestDescription serviceRequestDescription = new StandardServiceRequestDescription(
telemetryContentEncoding,
telemetryContentType,
telemetryRequestType,
remoteAddress
);
final ByteBuf content = httpRequest.content();
final int readableBytes = content.readableBytes();
final TelemetryContentType contentType = serviceRequestDescription.getContentType();
log.debug("HTTP Content Received: Client Address [{}] Content-Type [{}] Bytes [{}]", remoteAddress, contentType.getContentType(), readableBytes);
final ByteBuffer contentBuffer = content.nioBuffer();
final ServiceResponse serviceResponse = requestContentListener.onRequest(contentBuffer, serviceRequestDescription);
sendResponse(channelHandlerContext, httpRequest, serviceRequestDescription, serviceResponse);
}
private TelemetryContentEncoding getTelemetryContentEncoding(final String requestContentEncoding) {
TelemetryContentEncoding telemetryContentEncoding = TelemetryContentEncoding.NONE;
final String contentEncoding = requestContentEncoding == null ? StringUtils.EMPTY : requestContentEncoding;
for (final TelemetryContentEncoding currentEncoding : TelemetryContentEncoding.values()) {
if (currentEncoding.getContentEncoding().contentEquals(contentEncoding)) {
telemetryContentEncoding = currentEncoding;
break;
}
}
return telemetryContentEncoding;
}
private TelemetryRequestType getTelemetryRequestType(final String path, final TelemetryContentType telemetryContentType) {
TelemetryRequestType telemetryRequestType = null;
for (final TelemetryRequestType currentType : TelemetryRequestType.values()) {
final String requestTypePath;
if (TelemetryContentType.APPLICATION_GRPC == telemetryContentType) {
requestTypePath = currentType.getGrpcPath();
} else {
requestTypePath = currentType.getPath();
}
if (requestTypePath.contentEquals(path)) {
telemetryRequestType = currentType;
break;
}
}
return telemetryRequestType;
}
private TelemetryContentType getTelemetryContentType(final String requestContentType) {
TelemetryContentType telemetryContentType = null;
for (final TelemetryContentType currentType : TelemetryContentType.values()) {
if (currentType.getContentType().equals(requestContentType)) {
telemetryContentType = currentType;
break;
}
}
return telemetryContentType;
}
private GrpcStatusCode getGrpcStatusCode(final ServiceResponseStatus serviceResponseStatus) {
final GrpcStatusCode grpcStatusCode;
if (ServiceResponseStatus.SUCCESS == serviceResponseStatus) {
grpcStatusCode = GrpcStatusCode.OK;
} else if (ServiceResponseStatus.INVALID == serviceResponseStatus) {
grpcStatusCode = GrpcStatusCode.INVALID_ARGUMENT;
} else if (ServiceResponseStatus.PARTIAL_SUCCESS == serviceResponseStatus) {
grpcStatusCode = GrpcStatusCode.OK;
} else if (ServiceResponseStatus.UNAVAILABLE == serviceResponseStatus) {
grpcStatusCode = GrpcStatusCode.UNAVAILABLE;
} else {
grpcStatusCode = GrpcStatusCode.UNKNOWN;
}
return grpcStatusCode;
}
private void sendCloseResponse(final ChannelHandlerContext channelHandlerContext, final HttpRequest httpRequest, final HttpResponseStatus httpResponseStatus) {
final SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
final HttpMethod method = httpRequest.method();
final String uri = httpRequest.uri();
final HttpVersion httpVersion = httpRequest.protocolVersion();
log.debug("HTTP Request Closed: Client Address [{}] Method [{}] URI [{}] Version [{}] HTTP {}", remoteAddress, method, uri, httpVersion, httpResponseStatus.code());
final FullHttpResponse response = new DefaultFullHttpResponse(httpVersion, httpResponseStatus);
setStreamId(httpRequest.headers(), response);
final ChannelFuture future = channelHandlerContext.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
private void sendResponse(
final ChannelHandlerContext channelHandlerContext,
final FullHttpRequest httpRequest,
final ServiceRequestDescription serviceRequestDescription,
final ServiceResponse serviceResponse
) {
final SocketAddress remoteAddress = channelHandlerContext.channel().remoteAddress();
final HttpMethod method = httpRequest.method();
final String uri = httpRequest.uri();
final HttpVersion httpVersion = httpRequest.protocolVersion();
final ServiceResponseStatus serviceResponseStatus = serviceResponse.getServiceResponseStatus();
final HttpResponseStatus httpResponseStatus = HttpResponseStatus.valueOf(serviceResponseStatus.getStatusCode());
log.debug("HTTP Request Completed: Client Address [{}] Method [{}] URI [{}] Version [{}] HTTP {}", remoteAddress, method, uri, httpVersion, httpResponseStatus.code());
final FullHttpResponse response = new DefaultFullHttpResponse(httpVersion, httpResponseStatus);
setStreamId(httpRequest.headers(), response);
final TelemetryContentType telemetryContentType = serviceRequestDescription.getContentType();
response.headers().set(HttpHeaderNames.CONTENT_TYPE, telemetryContentType.getContentType());
if (TelemetryContentType.APPLICATION_GRPC == telemetryContentType) {
final GrpcStatusCode grpcStatusCode = getGrpcStatusCode(serviceResponseStatus);
response.headers().setInt(GrpcHeader.GRPC_STATUS.getHeader(), grpcStatusCode.getCode());
}
final byte[] responseBody = responseBodyWriter.getResponseBody(serviceRequestDescription, serviceResponse);
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, responseBody.length);
response.content().writeBytes(responseBody);
channelHandlerContext.writeAndFlush(response);
}
private void setStreamId(final HttpHeaders requestHeaders, final FullHttpResponse response) {
final String streamId = requestHeaders.get(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId != null) {
response.headers().set(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(), streamId);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.server;
import com.google.protobuf.Message;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
/**
* OpenTelemetry HTTP Server Factory for OTLP 1.0.0
*/
public class HttpServerFactory extends NettyEventServerFactory {
private static final String[] APPLICATION_PROTOCOLS = {ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1};
private static final Set<String> SUPPORTED_CIPHER_SUITES = new LinkedHashSet<>(Http2SecurityUtil.CIPHERS);
public HttpServerFactory(final ComponentLog log, final BlockingQueue<Message> messages, final InetAddress address, final int port, final SSLContext sslContext) {
super(address, port, TransportProtocol.TCP);
final SSLParameters sslParameters = getApplicationSslParameters(sslContext);
setSslParameters(sslParameters);
setSslContext(sslContext);
final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
setHandlerSupplier(() -> Arrays.asList(
new HttpProtocolNegotiationHandler(log, messages),
logExceptionChannelHandler
));
}
private SSLParameters getApplicationSslParameters(final SSLContext sslContext) {
final SSLParameters sslParameters = sslContext.getDefaultSSLParameters();
sslParameters.setApplicationProtocols(APPLICATION_PROTOCOLS);
final List<String> defaultCipherSuites = Arrays.asList(sslParameters.getCipherSuites());
final String[] cipherSuites = SupportedCipherSuiteFilter.INSTANCE.filterCipherSuites(defaultCipherSuites, defaultCipherSuites, SUPPORTED_CIPHER_SUITES);
sslParameters.setCipherSuites(cipherSuites);
return sslParameters;
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.opentelemetry.ListenOTLP

View File

@ -0,0 +1,508 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry;
import com.google.protobuf.Message;
import io.netty.handler.codec.http.HttpMethod;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsPartialSuccess;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import org.apache.nifi.processors.opentelemetry.encoding.RequestMapper;
import org.apache.nifi.processors.opentelemetry.encoding.StandardRequestMapper;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryContentType;
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryRequestType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.security.util.TlsPlatform;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.client.StandardWebClientService;
import org.apache.nifi.web.client.api.HttpEntityHeaders;
import org.apache.nifi.web.client.api.HttpResponseEntity;
import org.apache.nifi.web.client.api.HttpResponseStatus;
import org.apache.nifi.web.client.api.WebClientService;
import org.apache.nifi.web.client.ssl.TlsContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.X509KeyManager;
import javax.net.ssl.X509TrustManager;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.zip.GZIPOutputStream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@Timeout(15)
@ExtendWith(MockitoExtension.class)
class ListenOTLPTest {
private static final String LOCALHOST = "127.0.0.1";
private static final String RANDOM_PORT = "0";
private static final String HTTP_URL_FORMAT = "https://localhost:%d%s";
private static final String SERVICE_ID = SSLContextService.class.getSimpleName();
private static final String PATH_NOT_FOUND = "/not-found";
private static final String CONTENT_TYPE_HEADER = "Content-Type";
private static final String CONTENT_ENCODING_HEADER = "Content-Encoding";
private static final String GZIP_ENCODING = "gzip";
private static final String TEXT_PLAIN = "text/plain";
private static final String JSON_OBJECT_SUCCESS = "{}";
private static final byte[] JSON_OBJECT_SUCCESS_BYTES = JSON_OBJECT_SUCCESS.getBytes(StandardCharsets.UTF_8);
private static final String JSON_STRING_VALUE_LOCALHOST = "{\"stringValue\":\"127.0.0.1\"}";
private static final byte[] EMPTY_BYTES = new byte[]{};
private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(5);
private static final Duration READ_TIMEOUT = Duration.ofSeconds(5);
private static SSLContext sslContext;
private static X509TrustManager trustManager;
private static X509KeyManager keyManager;
@Mock
private SSLContextService sslContextService;
private WebClientService webClientService;
private TestRunner runner;
private ListenOTLP processor;
@BeforeAll
static void setSslContext() throws TlsException {
final TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
trustManager = SslContextFactory.getX509TrustManager(tlsConfiguration);
final KeyManagerFactory keyManagerFactory = KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration);
final KeyManager[] keyManagers = keyManagerFactory.getKeyManagers();
final Optional<KeyManager> firstKeyManager = Arrays.stream(keyManagers).findFirst();
final KeyManager configuredKeyManager = firstKeyManager.orElse(null);
keyManager = configuredKeyManager instanceof X509KeyManager ? (X509KeyManager) configuredKeyManager : null;
sslContext = SslContextFactory.createSslContext(tlsConfiguration);
}
@BeforeEach
void setRunner() {
processor = new ListenOTLP();
runner = TestRunners.newTestRunner(processor);
final StandardWebClientService standardWebClientService = new StandardWebClientService();
standardWebClientService.setReadTimeout(READ_TIMEOUT);
standardWebClientService.setConnectTimeout(CONNECT_TIMEOUT);
standardWebClientService.setWriteTimeout(READ_TIMEOUT);
standardWebClientService.setTlsContext(new TlsContext() {
@Override
public String getProtocol() {
return TlsPlatform.getLatestProtocol();
}
@Override
public X509TrustManager getTrustManager() {
return trustManager;
}
@Override
public Optional<X509KeyManager> getKeyManager() {
return Optional.ofNullable(keyManager);
}
});
webClientService = standardWebClientService;
}
@Test
void testRequiredProperties() throws InitializationException {
runner.assertNotValid();
runner.setProperty(ListenOTLP.ADDRESS, LOCALHOST);
setSslContextService();
runner.assertValid();
}
@Test
void testGetMethodNotAllowed() throws Exception {
startServer();
final URI uri = getUri(TelemetryRequestType.LOGS.getPath());
try (HttpResponseEntity httpResponseEntity = webClientService.get()
.uri(uri)
.retrieve()
) {
assertEquals(HttpResponseStatus.METHOD_NOT_ALLOWED.getCode(), httpResponseEntity.statusCode());
}
}
@Test
void testPostPathNotFound() throws Exception {
startServer();
final URI uri = getUri(PATH_NOT_FOUND);
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.header(CONTENT_TYPE_HEADER, TelemetryContentType.APPLICATION_JSON.getContentType())
.body(getRequestBody(), OptionalLong.empty())
.retrieve()
) {
assertEquals(HttpResponseStatus.NOT_FOUND.getCode(), httpResponseEntity.statusCode());
}
}
@Test
void testPostUnsupportedMediaType() throws Exception {
startServer();
final URI uri = getUri(TelemetryRequestType.LOGS.getPath());
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(getRequestBody(), OptionalLong.empty())
.header(CONTENT_TYPE_HEADER, TEXT_PLAIN)
.retrieve()
) {
assertEquals(HttpURLConnection.HTTP_UNSUPPORTED_TYPE, httpResponseEntity.statusCode());
}
}
@Test
void testPostEmptyJson() throws Exception {
startServer();
final URI uri = getUri(TelemetryRequestType.LOGS.getPath());
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(getRequestBody(), OptionalLong.empty())
.header(CONTENT_TYPE_HEADER, TelemetryContentType.APPLICATION_JSON.getContentType())
.retrieve()
) {
assertResponseSuccess(TelemetryContentType.APPLICATION_JSON, httpResponseEntity);
final byte[] responseBody = getResponseBody(httpResponseEntity.body());
assertArrayEquals(JSON_OBJECT_SUCCESS_BYTES, responseBody);
}
}
@Test
void testPostEmptyProtobuf() throws Exception {
startServer();
final URI uri = getUri(TelemetryRequestType.LOGS.getPath());
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(getRequestBody(), OptionalLong.empty())
.header(CONTENT_TYPE_HEADER, TelemetryContentType.APPLICATION_PROTOBUF.getContentType())
.retrieve()
) {
assertResponseSuccess(TelemetryContentType.APPLICATION_PROTOBUF, httpResponseEntity);
final byte[] responseBody = getResponseBody(httpResponseEntity.body());
assertArrayEquals(EMPTY_BYTES, responseBody);
}
}
@Test
void testPostEmptyGrpc() throws Exception {
startServer();
final URI uri = getUri(TelemetryRequestType.LOGS.getGrpcPath());
final byte[] uncompressedZeroMessageSize = new byte[]{0, 0, 0, 0, 0};
final ByteArrayInputStream requestBody = new ByteArrayInputStream(uncompressedZeroMessageSize);
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(requestBody, OptionalLong.empty())
.header(CONTENT_TYPE_HEADER, TelemetryContentType.APPLICATION_GRPC.getContentType())
.retrieve()
) {
assertResponseSuccess(TelemetryContentType.APPLICATION_GRPC, httpResponseEntity);
final byte[] responseBody = getResponseBody(httpResponseEntity.body());
assertArrayEquals(EMPTY_BYTES, responseBody);
}
}
@Test
void testPostProtobuf() throws Exception {
startServer();
final TelemetryRequestType requestType = TelemetryRequestType.LOGS;
final URI uri = getUri(requestType.getPath());
final ResourceLogs resourceLogs = ResourceLogs.newBuilder().build();
final ExportLogsServiceRequest request = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(resourceLogs)
.build();
final byte[] protobufRequest = request.toByteArray();
final ByteArrayInputStream requestBody = new ByteArrayInputStream(protobufRequest);
final TelemetryContentType contentType = TelemetryContentType.APPLICATION_PROTOBUF;
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(requestBody, OptionalLong.of(protobufRequest.length))
.header(CONTENT_TYPE_HEADER, contentType.getContentType())
.retrieve()
) {
assertResponseSuccess(contentType, httpResponseEntity);
final byte[] responseBody = getResponseBody(httpResponseEntity.body());
assertArrayEquals(EMPTY_BYTES, responseBody);
}
runner.run(1, false, false);
assertFlowFileFound(requestType);
}
@Test
void testPostJsonCompressed() throws Exception {
startServer();
final TelemetryRequestType requestType = TelemetryRequestType.LOGS;
final URI uri = getUri(requestType.getPath());
final ResourceLogs resourceLogs = ResourceLogs.newBuilder().build();
final ExportLogsServiceRequest request = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(resourceLogs)
.build();
final byte[] requestSerialized = getRequestSerialized(request);
final byte[] requestBody = getRequestCompressed(requestSerialized);
final ByteArrayInputStream requestBodyStream = new ByteArrayInputStream(requestBody);
final TelemetryContentType contentType = TelemetryContentType.APPLICATION_JSON;
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(requestBodyStream, OptionalLong.of(requestBody.length))
.header(CONTENT_TYPE_HEADER, contentType.getContentType())
.header(CONTENT_ENCODING_HEADER, GZIP_ENCODING)
.retrieve()
) {
assertResponseSuccess(contentType, httpResponseEntity);
final byte[] responseBody = getResponseBody(httpResponseEntity.body());
assertArrayEquals(JSON_OBJECT_SUCCESS_BYTES, responseBody);
}
runner.run(1, false, false);
assertFlowFileFound(requestType);
}
@Test
void testPostJsonPartialSuccess() throws Exception {
final int queueCapacity = 1;
runner.setProperty(ListenOTLP.QUEUE_CAPACITY, Integer.toString(queueCapacity));
startServer();
final TelemetryRequestType requestType = TelemetryRequestType.LOGS;
final URI uri = getUri(requestType.getPath());
final LogRecord logRecord = LogRecord.newBuilder().build();
final ScopeLogs scopeLogs = ScopeLogs.newBuilder().addLogRecords(logRecord).build();
final ResourceLogs resourceLogs = ResourceLogs.newBuilder().addScopeLogs(scopeLogs).build();
final ExportLogsServiceRequest request = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(resourceLogs)
.addResourceLogs(resourceLogs)
.build();
final byte[] requestSerialized = getRequestSerialized(request);
final TelemetryContentType contentType = TelemetryContentType.APPLICATION_JSON;
try (HttpResponseEntity httpResponseEntity = webClientService.post()
.uri(uri)
.body(new ByteArrayInputStream(requestSerialized), OptionalLong.of(requestSerialized.length))
.header(CONTENT_TYPE_HEADER, contentType.getContentType())
.retrieve()
) {
assertResponseSuccess(contentType, httpResponseEntity);
final RequestMapper requestMapper = new StandardRequestMapper();
final ExportLogsServiceResponse serviceResponse = requestMapper.readValue(httpResponseEntity.body(), ExportLogsServiceResponse.class);
final ExportLogsPartialSuccess partialSuccess = serviceResponse.getPartialSuccess();
assertNotNull(partialSuccess);
assertEquals(queueCapacity, partialSuccess.getRejectedLogRecords());
}
runner.run(1, false, false);
assertFlowFileFound(requestType);
}
@Test
void testPostProtobufUrlConnection() throws Exception {
startServer();
final TelemetryRequestType requestType = TelemetryRequestType.LOGS;
final URI uri = getUri(requestType.getPath());
final ResourceLogs resourceLogs = ResourceLogs.newBuilder().build();
final ExportLogsServiceRequest request = ExportLogsServiceRequest.newBuilder()
.addResourceLogs(resourceLogs)
.build();
final byte[] protobufRequest = request.toByteArray();
final TelemetryContentType contentType = TelemetryContentType.APPLICATION_PROTOBUF;
final HttpsURLConnection connection = (HttpsURLConnection) uri.toURL().openConnection();
connection.setSSLSocketFactory(sslContext.getSocketFactory());
connection.setRequestMethod(HttpMethod.POST.name());
connection.setConnectTimeout(Math.toIntExact(CONNECT_TIMEOUT.toMillis()));
connection.setReadTimeout(Math.toIntExact(READ_TIMEOUT.toMillis()));
connection.setRequestProperty(CONTENT_TYPE_HEADER, contentType.getContentType());
connection.setDoOutput(true);
try (OutputStream connectionOutputStream = connection.getOutputStream()) {
connectionOutputStream.write(protobufRequest);
}
final int responseCode = connection.getResponseCode();
assertEquals(HttpURLConnection.HTTP_OK, responseCode);
connection.disconnect();
runner.run(1, false, false);
assertFlowFileFound(requestType);
}
private byte[] getRequestSerialized(final Message message) throws IOException {
final RequestMapper requestMapper = new StandardRequestMapper();
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
requestMapper.writeValue(outputStream, message);
return outputStream.toByteArray();
}
private byte[] getRequestCompressed(final byte[] serialized) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(outputStream)) {
gzipOutputStream.write(serialized);
}
return outputStream.toByteArray();
}
private void assertFlowFileFound(final TelemetryRequestType requestType) {
final Iterator<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ListenOTLP.SUCCESS).iterator();
assertTrue(flowFiles.hasNext());
final MockFlowFile flowFile = flowFiles.next();
flowFile.assertAttributeEquals(TelemetryAttributeName.MIME_TYPE, TelemetryContentType.APPLICATION_JSON.getContentType());
flowFile.assertAttributeEquals(TelemetryAttributeName.RESOURCE_COUNT, Integer.toString(1));
flowFile.assertAttributeEquals(TelemetryAttributeName.RESOURCE_TYPE, requestType.name());
final String content = flowFile.getContent();
assertTrue(content.contains(JSON_STRING_VALUE_LOCALHOST));
}
private void assertResponseSuccess(final TelemetryContentType expectedContentType, final HttpResponseEntity httpResponseEntity) {
assertEquals(HttpResponseStatus.OK.getCode(), httpResponseEntity.statusCode());
final HttpEntityHeaders headers = httpResponseEntity.headers();
final Optional<String> firstContentType = headers.getFirstHeader(CONTENT_TYPE_HEADER);
assertTrue(firstContentType.isPresent());
assertEquals(expectedContentType.getContentType(), firstContentType.get());
}
private void startServer() throws InitializationException {
runner.setProperty(ListenOTLP.ADDRESS, LOCALHOST);
runner.setProperty(ListenOTLP.PORT, RANDOM_PORT);
setSslContextService();
when(sslContextService.createContext()).thenReturn(sslContext);
runner.run(1, false, true);
}
private URI getUri(final String contextPath) {
final int httpPort = processor.getPort();
final String httpUrl = String.format(HTTP_URL_FORMAT, httpPort, contextPath);
return URI.create(httpUrl);
}
private void setSslContextService() throws InitializationException {
when(sslContextService.getIdentifier()).thenReturn(SERVICE_ID);
runner.addControllerService(SERVICE_ID, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenOTLP.SSL_CONTEXT_SERVICE, SERVICE_ID);
runner.setProperty(ListenOTLP.CLIENT_AUTHENTICATION, ClientAuth.WANT.name());
}
private InputStream getRequestBody() {
return new ByteArrayInputStream(new byte[]{});
}
private byte[] getResponseBody(final InputStream inputStream) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
int read = inputStream.read();
while (read != -1) {
outputStream.write(read);
read = inputStream.read();
}
return outputStream.toByteArray();
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.opentelemetry.encoding;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.Span;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
class StandardRequestMapperTest {
private static final byte[] SPAN_ID = new byte[]{-18, -31, -101, 126, -61, -63, -79, 116};
private static final String SPAN_ID_ENCODED = "EEE19B7EC3C1B174";
private static final byte[] TRACE_ID = new byte[]{91, -114, -1, -9, -104, 3, -127, 3, -46, 105, -74, 51, -127, 63, -58, 12};
private static final String TRACE_ID_ENCODED = "5B8EFFF798038103D269B633813FC60C";
private static final byte[] PARENT_SPAN_ID = new byte[]{-18, -31, -101, 126, -61, -63, -79, 115};
private static final String PARENT_SPAN_ID_ENCODED = "EEE19B7EC3C1B173";
private static final Span.SpanKind SPAN_KIND = Span.SpanKind.SPAN_KIND_SERVER;
private static final ObjectMapper objectMapper = new ObjectMapper();
private final StandardRequestMapper requestMapper = new StandardRequestMapper();
@Test
void testWriteValueTraces() throws IOException {
final ExportTraceServiceRequest request = getTraceRequest();
final byte[] serialized = getSerialized(request);
final JsonNode rootNode = objectMapper.readTree(serialized);
final JsonNode resourceSpansNode = rootNode.get("resourceSpans");
assertNotNull(resourceSpansNode);
final JsonNode firstResourceSpan = resourceSpansNode.get(0);
assertNotNull(firstResourceSpan);
final JsonNode scopeSpansNode = firstResourceSpan.get("scopeSpans");
assertNotNull(scopeSpansNode);
final JsonNode firstScopeSpan = scopeSpansNode.get(0);
assertNotNull(firstScopeSpan);
final JsonNode spansNode = firstScopeSpan.get("spans");
assertNotNull(spansNode);
final JsonNode firstSpan = spansNode.get(0);
final String firstSpanId = firstSpan.get("spanId").asText();
assertEquals(SPAN_ID_ENCODED, firstSpanId);
final String firstTraceId = firstSpan.get("traceId").asText();
assertEquals(TRACE_ID_ENCODED, firstTraceId);
final String firstParentSpanId = firstSpan.get("parentSpanId").asText();
assertEquals(PARENT_SPAN_ID_ENCODED, firstParentSpanId);
final int firstSpanKind = firstSpan.get("kind").asInt();
assertEquals(SPAN_KIND.ordinal(), firstSpanKind);
}
@Test
void testReadValueTraces() throws IOException {
final ExportTraceServiceRequest request = getTraceRequest();
final byte[] serialized = getSerialized(request);
final ByteArrayInputStream inputStream = new ByteArrayInputStream(serialized);
final ExportTraceServiceRequest deserialized = requestMapper.readValue(inputStream, ExportTraceServiceRequest.class);
assertNotNull(deserialized);
final ResourceSpans resourceSpans = deserialized.getResourceSpans(0);
final ScopeSpans scopeSpans = resourceSpans.getScopeSpans(0);
final Span span = scopeSpans.getSpans(0);
assertArrayEquals(SPAN_ID, span.getSpanId().toByteArray());
assertArrayEquals(TRACE_ID, span.getTraceId().toByteArray());
assertArrayEquals(PARENT_SPAN_ID, span.getParentSpanId().toByteArray());
}
private byte[] getSerialized(final Message message) throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
requestMapper.writeValue(outputStream, message);
return outputStream.toByteArray();
}
private ExportTraceServiceRequest getTraceRequest() {
final Span span = Span.newBuilder()
.setSpanId(ByteString.copyFrom(SPAN_ID))
.setTraceId(ByteString.copyFrom(TRACE_ID))
.setParentSpanId(ByteString.copyFrom(PARENT_SPAN_ID))
.setKind(SPAN_KIND)
.build();
final ScopeSpans scopeSpans = ScopeSpans.newBuilder().addSpans(span).build();
final ResourceSpans resourceSpans = ResourceSpans.newBuilder().addScopeSpans(scopeSpans).build();
return ExportTraceServiceRequest.newBuilder().addResourceSpans(resourceSpans).build();
}
}

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-opentelemetry-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-opentelemetry-processors</module>
<module>nifi-opentelemetry-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
<version>3.24.3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- Override Guava from jackson-datatype-protobuf -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.2-jre</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -113,6 +113,7 @@
<module>nifi-cipher-bundle</module>
<module>nifi-py4j-bundle</module>
<module>nifi-compress-bundle</module>
<module>nifi-opentelemetry-bundle</module>
</modules>
<build>