From 4d4a5ca4be2f98e82ccf0b96290b27e0b06f277c Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Thu, 7 Jul 2022 17:54:11 +0200 Subject: [PATCH] NIFI-10251 Add v5 protocol support for existing MQTT processors This closes #6225. Signed-off-by: Peter Turcsanyi --- nifi-assembly/NOTICE | 299 +++++++++++++++++ .../src/main/resources/META-INF/NOTICE | 307 ++++++++++++++++++ .../nifi-mqtt-processors/pom.xml | 29 +- .../nifi/processors/mqtt/ConsumeMQTT.java | 184 +++++------ .../nifi/processors/mqtt/PublishMQTT.java | 58 ++-- .../mqtt/adapters/HiveMqV5ClientAdapter.java | 200 ++++++++++++ .../mqtt/adapters/PahoMqttClientAdapter.java | 191 +++++++++++ .../mqtt/common/AbstractMQTTProcessor.java | 281 ++++++++-------- .../processors/mqtt/common/MqttCallback.java | 23 ++ .../processors/mqtt/common/MqttClient.java | 69 ++++ .../mqtt/common/MqttClientFactory.java | 37 +++ .../mqtt/common/MqttClientProperties.java | 164 ++++++++++ .../processors/mqtt/common/MqttConstants.java | 19 +- .../processors/mqtt/common/MqttException.java | 28 ++ .../mqtt/common/MqttProtocolScheme.java | 24 ++ .../processors/mqtt/common/MqttVersion.java | 51 +++ .../mqtt/common/ReceivedMqttMessage.java | 38 +++ ...eMessage.java => StandardMqttMessage.java} | 33 +- .../nifi/processors/mqtt/TestConsumeMQTT.java | 24 +- .../nifi/processors/mqtt/TestPublishMQTT.java | 22 +- .../mqtt/common/MqttTestClient.java | 244 ++------------ .../mqtt/common/TestConsumeMqttCommon.java | 24 +- 22 files changed, 1765 insertions(+), 584 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java rename nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/{MQTTQueueMessage.java => StandardMqttMessage.java} (64%) diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 242b7c4305..b62e10a256 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -1565,6 +1565,275 @@ The following binary components are provided under the Apache Software License v * http://tomcat.apache.org/native-doc/ * https://svn.apache.org/repos/asf/tomcat/native/ + (ASLv2) The Netty Project (4.1.77.Final) + The following NOTICE information applies: + netty/netty + Copyright 2014 The Netty Project + ------------------------------------------------------------------------------- + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * https://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * https://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * NOTICE: + * license/NOTICE.harmony.txt + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * https://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product optionally depends on 'zstd-jni', a zstd-jni Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.zstd-jni.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/luben/zstd-jni + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * https://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jboss-remoting/jboss-marshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * https://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at: + + * LICENSE: + * license/LICENSE.hyper-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/python-hyper/hpack/ + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at: + + * LICENSE: + * license/LICENSE.nghttp2-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/nghttp2/nghttp2/ + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + + This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. + + * LICENSE: + * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/takari/maven-wrapper + + This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS. + This private header is also used by Apple's open source + mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/). + + * LICENSE: + * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0) + * HOMEPAGE: + * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h + + This product optionally depends on 'Brotli4j', Brotli compression and + decompression for Java., which can be obtained at: + + * LICENSE: + * license/LICENSE.brotli4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/hyperxpro/Brotli4j + (ASLv2) Error Prone The following NOTICE information applies: Copyright 2017 Google Inc. @@ -1958,6 +2227,36 @@ The following binary components are provided under the Apache Software License v Copyright (c) 2014-2019 Appsicle Copyright (c) 2019-2020 QuestDB + (ASLv2) HiveMQ MQTT Client + The following NOTICE information applies: + HiveMQ MQTT Client + Copyright 2018-2022 HiveMQ and the HiveMQ Community + + (ASLv2) ReactiveX/RxJava + The following NOTICE information applies: + ReactiveX/RxJava + Copyright 2018-present RxJava Contributors + + (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core) + The following NOTICE information applies: + JCTools/JCTools + Copyright + + (ASLv2) JetBrains/java-annotations + The following NOTICE information applies: + JetBrains/java-annotations + Copyright 2000-2016 JetBrains s.r.o. + + (ASLv2) google/dagger + The following NOTICE information applies: + google/dagger + Copyright 2012 The Dagger Authors + + (ASLv2) atinject (javax.inject:javax.inject) + The following NOTICE information applies: + atinject + Copyright + ************************ Common Development and Distribution License 1.1 ************************ diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE index f95db89b8c..306617c48e 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE @@ -41,6 +41,305 @@ The following binary components are provided under the Apache Software License v in some artifacts (usually source distributions); but is always available from the source code management (SCM) system project uses. + (ASLv2) HiveMQ MQTT Client + The following NOTICE information applies: + HiveMQ MQTT Client + Copyright 2018-2022 HiveMQ and the HiveMQ Community + + (ASLv2) ReactiveX/RxJava + The following NOTICE information applies: + ReactiveX/RxJava + Copyright 2018-present RxJava Contributors + + (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core) + The following NOTICE information applies: + JCTools/JCTools + Copyright + + (ASLv2) JetBrains/java-annotations + The following NOTICE information applies: + JetBrains/java-annotations + Copyright 2000-2016 JetBrains s.r.o. + + (ASLv2) google/dagger + The following NOTICE information applies: + google/dagger + Copyright 2012 The Dagger Authors + + (ASLv2) atinject (javax.inject:javax.inject) + The following NOTICE information applies: + atinject + Copyright + + (ASLv2) The Netty Project + The following NOTICE information applies: + netty/netty + Copyright 2014 The Netty Project + ------------------------------------------------------------------------------- + The Netty Project + ================= + + Please visit the Netty web site for more information: + + * https://netty.io/ + + Copyright 2014 The Netty Project + + The Netty Project licenses this file to you under the Apache License, + version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at: + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + License for the specific language governing permissions and limitations + under the License. + + Also, please refer to each LICENSE..txt file, which is located in + the 'license' directory of the distribution file, for the license terms of the + components that this product depends on. + + ------------------------------------------------------------------------------- + This product contains the extensions to Java Collections Framework which has + been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene: + + * LICENSE: + * license/LICENSE.jsr166y.txt (Public Domain) + * HOMEPAGE: + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/ + * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/ + + This product contains a modified version of Robert Harder's Public Domain + Base64 Encoder and Decoder, which can be obtained at: + + * LICENSE: + * license/LICENSE.base64.txt (Public Domain) + * HOMEPAGE: + * http://iharder.sourceforge.net/current/java/base64/ + + This product contains a modified portion of 'Webbit', an event based + WebSocket and HTTP server, which can be obtained at: + + * LICENSE: + * license/LICENSE.webbit.txt (BSD License) + * HOMEPAGE: + * https://github.com/joewalnes/webbit + + This product contains a modified portion of 'SLF4J', a simple logging + facade for Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.slf4j.txt (MIT License) + * HOMEPAGE: + * https://www.slf4j.org/ + + This product contains a modified portion of 'Apache Harmony', an open source + Java SE, which can be obtained at: + + * NOTICE: + * license/NOTICE.harmony.txt + * LICENSE: + * license/LICENSE.harmony.txt (Apache License 2.0) + * HOMEPAGE: + * https://archive.apache.org/dist/harmony/ + + This product contains a modified portion of 'jbzip2', a Java bzip2 compression + and decompression library written by Matthew J. Francis. It can be obtained at: + + * LICENSE: + * license/LICENSE.jbzip2.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jbzip2/ + + This product contains a modified portion of 'libdivsufsort', a C API library to construct + the suffix array and the Burrows-Wheeler transformed string for any input string of + a constant-size alphabet written by Yuta Mori. It can be obtained at: + + * LICENSE: + * license/LICENSE.libdivsufsort.txt (MIT License) + * HOMEPAGE: + * https://github.com/y-256/libdivsufsort + + This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + + This product optionally depends on 'JZlib', a re-implementation of zlib in + pure Java, which can be obtained at: + + * LICENSE: + * license/LICENSE.jzlib.txt (BSD style License) + * HOMEPAGE: + * http://www.jcraft.com/jzlib/ + + This product optionally depends on 'Compress-LZF', a Java library for encoding and + decoding data in LZF format, written by Tatu Saloranta. It can be obtained at: + + * LICENSE: + * license/LICENSE.compress-lzf.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/ning/compress + + This product optionally depends on 'lz4', a LZ4 Java compression + and decompression library written by Adrien Grand. It can be obtained at: + + * LICENSE: + * license/LICENSE.lz4.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jpountz/lz4-java + + This product optionally depends on 'lzma-java', a LZMA Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.lzma-java.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jponge/lzma-java + + This product optionally depends on 'zstd-jni', a zstd-jni Java compression + and decompression library, which can be obtained at: + + * LICENSE: + * license/LICENSE.zstd-jni.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/luben/zstd-jni + + This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression + and decompression library written by William Kinney. It can be obtained at: + + * LICENSE: + * license/LICENSE.jfastlz.txt (MIT License) + * HOMEPAGE: + * https://code.google.com/p/jfastlz/ + + This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data + interchange format, which can be obtained at: + + * LICENSE: + * license/LICENSE.protobuf.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/protobuf + + This product optionally depends on 'Bouncy Castle Crypto APIs' to generate + a temporary self-signed X.509 certificate when the JVM does not provide the + equivalent functionality. It can be obtained at: + + * LICENSE: + * license/LICENSE.bouncycastle.txt (MIT License) + * HOMEPAGE: + * https://www.bouncycastle.org/ + + This product optionally depends on 'Snappy', a compression library produced + by Google Inc, which can be obtained at: + + * LICENSE: + * license/LICENSE.snappy.txt (New BSD License) + * HOMEPAGE: + * https://github.com/google/snappy + + This product optionally depends on 'JBoss Marshalling', an alternative Java + serialization API, which can be obtained at: + + * LICENSE: + * license/LICENSE.jboss-marshalling.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/jboss-remoting/jboss-marshalling + + This product optionally depends on 'Caliper', Google's micro- + benchmarking framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.caliper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/google/caliper + + This product optionally depends on 'Apache Commons Logging', a logging + framework, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-logging.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/logging/ + + This product optionally depends on 'Apache Log4J', a logging framework, which + can be obtained at: + + * LICENSE: + * license/LICENSE.log4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://logging.apache.org/log4j/ + + This product optionally depends on 'Aalto XML', an ultra-high performance + non-blocking XML processor, which can be obtained at: + + * LICENSE: + * license/LICENSE.aalto-xml.txt (Apache License 2.0) + * HOMEPAGE: + * https://wiki.fasterxml.com/AaltoHome + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at: + + * LICENSE: + * license/LICENSE.hpack.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/twitter/hpack + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at: + + * LICENSE: + * license/LICENSE.hyper-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/python-hyper/hpack/ + + This product contains a modified version of 'HPACK', a Java implementation of + the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at: + + * LICENSE: + * license/LICENSE.nghttp2-hpack.txt (MIT License) + * HOMEPAGE: + * https://github.com/nghttp2/nghttp2/ + + This product contains a modified portion of 'Apache Commons Lang', a Java library + provides utilities for the java.lang API, which can be obtained at: + + * LICENSE: + * license/LICENSE.commons-lang.txt (Apache License 2.0) + * HOMEPAGE: + * https://commons.apache.org/proper/commons-lang/ + + + This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build. + + * LICENSE: + * license/LICENSE.mvn-wrapper.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/takari/maven-wrapper + + This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS. + This private header is also used by Apple's open source + mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/). + + * LICENSE: + * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0) + * HOMEPAGE: + * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h + + This product optionally depends on 'Brotli4j', Brotli compression and + decompression for Java., which can be obtained at: + + * LICENSE: + * license/LICENSE.brotli4j.txt (Apache License 2.0) + * HOMEPAGE: + * https://github.com/hyperxpro/Brotli4j + ************************ Eclipse Public License 1.0 ************************ @@ -48,3 +347,11 @@ Eclipse Public License 1.0 The following binary components are provided under the Eclipse Public License 1.0. See project link for details. (EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0 - https://github.com/eclipse/paho.mqtt.java) + +***************** +Public Domain +***************** + +The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details. + + (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/) diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml index 2ed90145e7..3e9bed91ef 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -48,6 +48,11 @@ org.apache.nifi nifi-record + + org.apache.nifi + nifi-security-utils + 1.18.0-SNAPSHOT + @@ -55,6 +60,11 @@ org.eclipse.paho.client.mqttv3 1.2.2 + + com.hivemq + hivemq-mqtt-client + 1.3.0 + org.apache.commons commons-lang3 @@ -84,23 +94,4 @@ test - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - **/integration/TestConsumeMQTT.java - **/integration/TestConsumeMqttSSL.java - **/integration/TestPublishAndSubscribeMqttIntegration.java - **/integration/TestPublishMQTT.java - **/integration/TestPublishMqttSSL.java - - - - - diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index 7c5a7ff49c..46d1452cea 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -42,10 +42,11 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; -import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.processors.mqtt.common.MqttCallback; +import org.apache.nifi.processors.mqtt.common.MqttException; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; @@ -58,10 +59,6 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -209,7 +206,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { private volatile String topicFilter; private final AtomicBoolean scheduled = new AtomicBoolean(false); - private volatile LinkedBlockingQueue mqttQueue; + private volatile LinkedBlockingQueue mqttQueue; public static final Relationship REL_MESSAGE = new Relationship.Builder() .name("Message") @@ -226,7 +223,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { private static final List descriptors; private static final Set relationships; - static{ + static { final List innerDescriptorsList = getAbstractPropertyDescriptors(); innerDescriptorsList.add(PROP_GROUPID); innerDescriptorsList.add(PROP_TOPIC_FILTER); @@ -238,7 +235,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { innerDescriptorsList.add(MESSAGE_DEMARCATOR); descriptors = Collections.unmodifiableList(innerDescriptorsList); - final Set innerRelationshipsSet = new HashSet(); + final Set innerRelationshipsSet = new HashSet<>(); innerRelationshipsSet.add(REL_MESSAGE); innerRelationshipsSet.add(REL_PARSE_FAILURE); relationships = Collections.unmodifiableSet(innerRelationshipsSet); @@ -249,15 +246,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { // resize the receive buffer, but preserve data if (descriptor == PROP_MAX_QUEUE_SIZE) { // it's a mandatory integer, never null - int newSize = Integer.valueOf(newValue); + int newSize = Integer.parseInt(newValue); if (mqttQueue != null) { int msgPending = mqttQueue.size(); if (msgPending > newSize) { - logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", - new Object[]{newSize, msgPending}); + logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", newSize, msgPending); return; } - LinkedBlockingQueue newBuffer = new LinkedBlockingQueue<>(newSize); + LinkedBlockingQueue newBuffer = new LinkedBlockingQueue<>(newSize); mqttQueue.drainTo(newBuffer); mqttQueue = newBuffer; } @@ -297,15 +293,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { final boolean readerIsSet = context.getProperty(RECORD_READER).isSet(); final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet(); - if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) { + if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) { results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) - .explanation("Both Record Reader and Writer must be set when used").build()); + .explanation("both Record Reader and Writer must be set when used.").build()); } final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet(); - if(readerIsSet && demarcatorIsSet) { + if (readerIsSet && demarcatorIsSet) { results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) - .explanation("You cannot use both a demarcator and a Reader/Writer").build()); + .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build()); } return results; @@ -346,17 +342,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { public void onUnscheduled(final ProcessContext context) { scheduled.set(false); synchronized (this) { - super.onStopped(); + stopClient(); } } @OnStopped - public void onStopped(final ProcessContext context) throws IOException { - if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { + public void onStopped(final ProcessContext context) { + if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { logger.info("Finishing processing leftover messages"); ProcessSession session = processSessionFactory.createSession(); - if(context.getProperty(RECORD_READER).isSet()) { + if (context.getProperty(RECORD_READER).isSet()) { transferQueueRecord(context, session); } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { transferQueueDemarcator(context, session); @@ -364,7 +360,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { transferQueue(session); } } else { - if (mqttQueue!= null && !mqttQueue.isEmpty()){ + if (mqttQueue != null && !mqttQueue.isEmpty()) { throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " + "clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " + "in the MQTT internal queue cannot finish processing until until the processor is triggered to run."); @@ -375,7 +371,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final boolean isScheduled = scheduled.get(); - if (!isConnected() && isScheduled){ + if (!isConnected() && isScheduled) { synchronized (this) { if (!isConnected()) { initializeClient(context); @@ -402,42 +398,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { // non-null but not connected, so we need to handle each case and only create a new client when it is null try { if (mqttClient == null) { - logger.debug("Creating client"); - mqttClient = createMqttClient(broker, clientID, persistence); + mqttClient = createMqttClient(); mqttClient.setCallback(this); } if (!mqttClient.isConnected()) { - logger.debug("Connecting client"); - mqttClient.connect(connOpts); + mqttClient.connect(); mqttClient.subscribe(topicPrefix + topicFilter, qos); } - } catch (MqttException e) { - logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); + } catch (Exception e) { + logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{clientProperties.getBroker()}, e); + mqttClient = null; // prevent stucked processor when subscribe fails context.yield(); } } - private void transferQueue(ProcessSession session){ + private void transferQueue(ProcessSession session) { while (!mqttQueue.isEmpty()) { - final MQTTQueueMessage mqttMessage = mqttQueue.peek(); - FlowFile messageFlowfile = session.create(); + final ReceivedMqttMessage mqttMessage = mqttQueue.peek(); - Map attrs = new HashMap<>(); - attrs.put(BROKER_ATTRIBUTE_KEY, broker); - attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic()); - attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos())); - attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate())); - attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained())); - - messageFlowfile = session.putAllAttributes(messageFlowfile, attrs); - - messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); - } - }); + final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage), + out -> out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload())); session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); session.transfer(messageFlowfile, REL_MESSAGE); @@ -446,17 +427,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { } } - private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ + private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session) { final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); FlowFile messageFlowfile = session.create(); - session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); - + session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker()); messageFlowfile = session.append(messageFlowfile, out -> { int i = 0; while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { - final MQTTQueueMessage mqttMessage = mqttQueue.poll(); + final ReceivedMqttMessage mqttMessage = mqttQueue.poll(); out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); out.write(demarcator); session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); @@ -469,41 +449,40 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { session.commitAsync(); } - private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) { - FlowFile messageFlowfile = session.create(); - - Map attrs = new HashMap<>(); - attrs.put(BROKER_ATTRIBUTE_KEY, broker); - attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic()); - attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos())); - attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate())); - attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained())); - - messageFlowfile = session.putAllAttributes(messageFlowfile, attrs); - - messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(mqttMessage.getPayload()); - } - }); + private void transferFailure(final ProcessSession session, final ReceivedMqttMessage mqttMessage) { + final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage), + out -> out.write(mqttMessage.getPayload())); session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); session.transfer(messageFlowfile, REL_PARSE_FAILURE); session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false); } - private void transferQueueRecord(final ProcessContext context, final ProcessSession session){ + private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) { + FlowFile messageFlowfile = session.create(); + + Map attrs = new HashMap<>(); + attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker()); + attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic()); + attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos())); + attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate())); + attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained())); + + messageFlowfile = session.putAllAttributes(messageFlowfile, attrs); + return messageFlowfile; + } + + private void transferQueueRecord(final ProcessContext context, final ProcessSession session) { final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); FlowFile flowFile = session.create(); - session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker); + session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker()); final Map attributes = new HashMap<>(); final AtomicInteger recordCount = new AtomicInteger(); - final List doneList = new ArrayList(); + final List doneList = new ArrayList<>(); RecordSetWriter writer = null; boolean isWriterInitialized = false; @@ -511,8 +490,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { try { while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { - final MQTTQueueMessage mqttMessage = mqttQueue.poll(); - if(mqttMessage == null) { + final ReceivedMqttMessage mqttMessage = mqttQueue.poll(); + if (mqttMessage == null) { break; } @@ -533,16 +512,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { Record record; while ((record = reader.nextRecord()) != null) { - if(!isWriterInitialized) { + if (!isWriterInitialized) { final RecordSchema recordSchema = record.getSchema(); final OutputStream rawOut = session.write(flowFile); RecordSchema writeSchema; try { writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); - if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { - final List fields = new ArrayList<>(); - fields.addAll(writeSchema.getFields()); + if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { + final List fields = new ArrayList<>(writeSchema.getFields()); fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType())); fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType())); @@ -562,7 +540,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { } try { - if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { + if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic()); record.setValue(QOS_FIELD_KEY, mqttMessage.getQos()); record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained()); @@ -583,16 +561,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { } catch (final IOException | MalformedRecordException | SchemaValidationException e) { logger.error("Failed to write message, sending to the parse failure relationship", e); transferFailure(session, mqttMessage); - continue; } } catch (Exception e) { logger.error("Failed to write message, sending to the parse failure relationship", e); transferFailure(session, mqttMessage); - continue; } } - if(writer != null) { + if (writer != null) { final WriteResult writeResult = writer.finishRecordSet(); attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); @@ -605,26 +581,26 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { // we try to add the messages back into the internal queue int numberOfMessages = 0; - for(MQTTQueueMessage done : doneList) { + for (ReceivedMqttMessage done : doneList) { try { mqttQueue.offer(done, 1, TimeUnit.SECONDS); } catch (InterruptedException ex) { numberOfMessages++; - if(getLogger().isDebugEnabled()) { + if (getLogger().isDebugEnabled()) { logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex); } } } - if(numberOfMessages > 0) { - logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages}); + if (numberOfMessages > 0) { + logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", numberOfMessages); } - throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e); + throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getBroker(), e); } finally { closeWriter(writer); } - if(recordCount.get() == 0) { + if (recordCount.get() == 0) { session.remove(flowFile); return; } @@ -635,7 +611,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { final int count = recordCount.get(); session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false); - getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile}); + getLogger().info("Successfully processed {} records for {}", count, flowFile); } private void closeWriter(final RecordSetWriter writer) { @@ -649,8 +625,9 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { } private String getTransitUri(String... appends) { - StringBuilder stringBuilder = new StringBuilder(brokerUri); - for(String append : appends) { + String broker = clientProperties.getBrokerUri().toString(); + StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? broker : broker + "/"); + for (String append : appends) { stringBuilder.append(append); } return stringBuilder.toString(); @@ -658,29 +635,34 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { @Override public void connectionLost(Throwable cause) { - logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); + logger.error("Connection to {} lost", clientProperties.getBroker(), cause); } @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + public void messageArrived(ReceivedMqttMessage message) { if (logger.isDebugEnabled()) { byte[] payload = message.getPayload(); - String text = new String(payload, "UTF-8"); + String text = new String(payload, StandardCharsets.UTF_8); if (StringUtils.isAsciiPrintable(text)) { - logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text}); + logger.debug("Message arrived from topic {}. Payload: {}", message.getTopic(), text); } else { - logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length}); + logger.debug("Message arrived from topic {}. Binary value of size {}", message.getTopic(), payload.length); } } - if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) { - throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); + try { + if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) { + throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); + } + } catch (InterruptedException e) { + throw new MqttException("Failed to process message arrived from topic " + message.getTopic()); } } @Override - public void deliveryComplete(IMqttDeliveryToken token) { - logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token); + public void deliveryComplete(String token) { + // Unlikely situation. Api uses the same callback for publisher and consumer as well. + // That's why we have this log message here to indicate something really messy thing happened. + logger.error("Received MQTT 'delivery complete' message to subscriber. Token: [{}]", token); } - } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java index 0db7615d7e..2ea80b1ded 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java @@ -36,18 +36,15 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; +import org.apache.nifi.processors.mqtt.common.MqttCallback; +import org.apache.nifi.processors.mqtt.common.MqttException; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.StopWatch; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import java.io.IOException; -import java.io.InputStream; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -137,18 +134,18 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { @OnStopped public void onStopped(final ProcessContext context) { synchronized (this) { - super.onStopped(); + stopClient(); } } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowfile = session.get(); + final FlowFile flowfile = session.get(); if (flowfile == null) { return; } - if (!isConnected()){ + if (!isConnected()) { synchronized (this) { if (!isConnected()) { initializeClient(context); @@ -157,7 +154,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { } // get the MQTT topic - String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue(); + final String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue(); if (topic == null || topic.isEmpty()) { logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure"); @@ -167,18 +164,11 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { // do the read final byte[] messageContent = new byte[(int) flowfile.getSize()]; - session.read(flowfile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, messageContent, true); - } - }); + session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true)); int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger(); - final MqttMessage mqttMessage = new MqttMessage(messageContent); - mqttMessage.setQos(qos); - mqttMessage.setPayload(messageContent); - mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean()); + boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean(); + final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained); try { final StopWatch stopWatch = new StopWatch(true); @@ -188,9 +178,9 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { */ mqttClient.publish(topic, mqttMessage); - session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowfile, REL_SUCCESS); - } catch(MqttException me) { + } catch (MqttException me) { logger.error("Failed to publish message.", me); session.transfer(flowfile, REL_FAILURE); } @@ -201,35 +191,35 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { // non-null but not connected, so we need to handle each case and only create a new client when it is null try { if (mqttClient == null) { - logger.debug("Creating client"); - mqttClient = createMqttClient(broker, clientID, persistence); + mqttClient = createMqttClient(); mqttClient.setCallback(this); } if (!mqttClient.isConnected()) { - logger.debug("Connecting client"); - mqttClient.connect(connOpts); + mqttClient.connect(); } - } catch (MqttException e) { - logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); + } catch (Exception e) { + logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", clientProperties.getBroker(), e); context.yield(); } } @Override public void connectionLost(Throwable cause) { - logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); + logger.error("Connection to {} lost", clientProperties.getBroker(), cause); } @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}"); + public void messageArrived(ReceivedMqttMessage message) { + // Unlikely situation. Api uses the same callback for publisher and consumer as well. + // That's why we have this log message here to indicate something really messy thing happened. + logger.error("Message arrived to a PublishMQTT processor { topic:'" + message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + "}"); } @Override - public void deliveryComplete(IMqttDeliveryToken token) { + public void deliveryComplete(String token) { // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. - logger.trace("Received 'delivery complete' message from broker for:" + token.toString()); + logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token); } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java new file mode 100644 index 0000000000..7532411055 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.adapters; + +import com.hivemq.client.mqtt.datatypes.MqttQos; +import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; +import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; +import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; +import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.mqtt.common.MqttCallback; +import org.apache.nifi.processors.mqtt.common.MqttClient; +import org.apache.nifi.processors.mqtt.common.MqttClientProperties; +import org.apache.nifi.processors.mqtt.common.MqttException; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; +import org.apache.nifi.security.util.KeyStoreUtils; +import org.apache.nifi.security.util.TlsException; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL; +import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS; +import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS; + +public class HiveMqV5ClientAdapter implements MqttClient { + + private final Mqtt5BlockingClient mqtt5BlockingClient; + private final MqttClientProperties clientProperties; + private final ComponentLog logger; + + private MqttCallback callback; + + public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException { + this.mqtt5BlockingClient = createClient(clientProperties, logger); + this.clientProperties = clientProperties; + this.logger = logger; + } + + @Override + public boolean isConnected() { + return mqtt5BlockingClient.getState().isConnected(); + } + + @Override + public void connect() { + logger.debug("Connecting to broker"); + + final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder() + .keepAlive(clientProperties.getKeepAliveInterval()); + + final boolean cleanSession = clientProperties.isCleanSession(); + connectBuilder.cleanStart(cleanSession); + if (!cleanSession) { + connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval()); + } + + final String lastWillTopic = clientProperties.getLastWillTopic(); + if (lastWillTopic != null) { + connectBuilder.willPublish() + .topic(lastWillTopic) + .payload(clientProperties.getLastWillMessage().getBytes()) + .retain(clientProperties.getLastWillRetain()) + .qos(MqttQos.fromCode(clientProperties.getLastWillQos())) + .applyWillPublish(); + } + + final String username = clientProperties.getUsername(); + final String password = clientProperties.getPassword(); + if (username != null && password != null) { + connectBuilder.simpleAuth() + .username(clientProperties.getUsername()) + .password(password.getBytes(StandardCharsets.UTF_8)) + .applySimpleAuth(); + } + + final Mqtt5Connect mqtt5Connect = connectBuilder.build(); + mqtt5BlockingClient.connect(mqtt5Connect); + } + + @Override + public void disconnect() { + logger.debug("Disconnecting client"); + // Currently it is not possible to set timeout for disconnect with HiveMQ Client. + mqtt5BlockingClient.disconnect(); + } + + @Override + public void close() { + // there is no paho's close equivalent in hivemq client + } + + @Override + public void publish(String topic, StandardMqttMessage message) { + logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos()); + + mqtt5BlockingClient.publishWith() + .topic(topic) + .payload(message.getPayload()) + .retain(message.isRetained()) + .qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos()))) + .send(); + } + + @Override + public void subscribe(String topicFilter, int qos) { + Objects.requireNonNull(callback, "callback should be set"); + + logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos); + + CompletableFuture futureAck = mqtt5BlockingClient.toAsync().subscribeWith() + .topicFilter(topicFilter) + .qos(Objects.requireNonNull(MqttQos.fromCode(qos))) + .callback(mqtt5Publish -> { + final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage( + mqtt5Publish.getPayloadAsBytes(), + mqtt5Publish.getQos().getCode(), + mqtt5Publish.isRetain(), + mqtt5Publish.getTopic().toString()); + callback.messageArrived(receivedMessage); + }) + .send(); + + // Setting "listener" callback is only possible with async client, though sending subscribe message + // should happen in a blocking way to make sure the processor is blocked until ack is not arrived. + try { + Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS); + logger.debug("Received mqtt5 subscribe ack: {}", ack); + } catch (Exception e) { + throw new MqttException("An error has occurred during sending subscribe message to broker", e); + } + } + + @Override + public void setCallback(MqttCallback callback) { + this.callback = callback; + } + + private static Mqtt5BlockingClient createClient(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException { + logger.debug("Creating Mqtt v5 client"); + + Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder() + .identifier(clientProperties.getClientId()) + .serverHost(clientProperties.getBrokerUri().getHost()); + + int port = clientProperties.getBrokerUri().getPort(); + if (port != -1) { + mqtt5ClientBuilder.serverPort(port); + } + + // default is tcp + if (WS.equals(clientProperties.getScheme()) || WSS.equals(clientProperties.getScheme())) { + mqtt5ClientBuilder.webSocketConfig().applyWebSocketConfig(); + } + + if (SSL.equals(clientProperties.getScheme())) { + if (clientProperties.getTlsConfiguration().getTruststorePath() != null) { + mqtt5ClientBuilder + .sslConfig() + .trustManagerFactory(KeyStoreUtils.loadTrustManagerFactory( + clientProperties.getTlsConfiguration().getTruststorePath(), + clientProperties.getTlsConfiguration().getTruststorePassword(), + clientProperties.getTlsConfiguration().getTruststoreType().getType())) + .applySslConfig(); + } + + if (clientProperties.getTlsConfiguration().getKeystorePath() != null) { + mqtt5ClientBuilder + .sslConfig() + .keyManagerFactory(KeyStoreUtils.loadKeyManagerFactory( + clientProperties.getTlsConfiguration().getKeystorePath(), + clientProperties.getTlsConfiguration().getKeystorePassword(), + null, + clientProperties.getTlsConfiguration().getKeystoreType().getType())) + .applySslConfig(); + } + } + + return mqtt5ClientBuilder.buildBlocking(); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java new file mode 100644 index 0000000000..90cdc5c1c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.adapters; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.mqtt.common.MqttCallback; +import org.apache.nifi.processors.mqtt.common.MqttClient; +import org.apache.nifi.processors.mqtt.common.MqttClientProperties; +import org.apache.nifi.processors.mqtt.common.MqttException; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; +import org.apache.nifi.security.util.TlsConfiguration; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.util.Properties; + +public class PahoMqttClientAdapter implements MqttClient { + + public static final int DISCONNECT_TIMEOUT = 5000; + + private final IMqttClient client; + private final MqttClientProperties clientProperties; + private final ComponentLog logger; + + public PahoMqttClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) { + this.client = createClient(clientProperties, logger); + this.clientProperties = clientProperties; + this.logger = logger; + } + + @Override + public boolean isConnected() { + return client.isConnected(); + } + + @Override + public void connect() { + logger.debug("Connecting to broker"); + + try { + final MqttConnectOptions connectOptions = new MqttConnectOptions(); + + connectOptions.setCleanSession(clientProperties.isCleanSession()); + connectOptions.setKeepAliveInterval(clientProperties.getKeepAliveInterval()); + connectOptions.setMqttVersion(clientProperties.getMqttVersion().getVersionCode()); + connectOptions.setConnectionTimeout(clientProperties.getConnectionTimeout()); + + final TlsConfiguration tlsConfiguration = clientProperties.getTlsConfiguration(); + if (tlsConfiguration != null) { + connectOptions.setSSLProperties(transformSSLContextService(tlsConfiguration)); + } + + final String lastWillTopic = clientProperties.getLastWillTopic(); + if (lastWillTopic != null) { + boolean lastWillRetain = clientProperties.getLastWillRetain() != null && clientProperties.getLastWillRetain(); + connectOptions.setWill(lastWillTopic, clientProperties.getLastWillMessage().getBytes(), clientProperties.getLastWillQos(), lastWillRetain); + } + + final String username = clientProperties.getUsername(); + if (username != null) { + connectOptions.setUserName(username); + connectOptions.setPassword(clientProperties.getPassword().toCharArray()); + } + + client.connect(connectOptions); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during connecting to broker", e); + } + } + + @Override + public void disconnect() { + logger.debug("Disconnecting client with timeout: {}", DISCONNECT_TIMEOUT); + + try { + client.disconnect(DISCONNECT_TIMEOUT); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during disconnecting client with timeout: " + DISCONNECT_TIMEOUT, e); + } + } + + @Override + public void close() { + logger.debug("Closing client"); + + try { + client.close(); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during closing client", e); + } + } + + @Override + public void publish(String topic, StandardMqttMessage message) { + logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos()); + + try { + client.publish(topic, message.getPayload(), message.getQos(), message.isRetained()); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during publishing message to " + topic + " with QoS: " + message.getQos(), e); + } + } + + @Override + public void subscribe(String topicFilter, int qos) { + logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos); + + try { + client.subscribe(topicFilter, qos); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during subscribing to " + topicFilter + " with QoS: " + qos, e); + } + } + + @Override + public void setCallback(MqttCallback callback) { + client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + callback.connectionLost(cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + logger.debug("Message arrived with id: {}", message.getId()); + final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic); + callback.messageArrived(receivedMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + callback.deliveryComplete(token.toString()); + } + }); + } + + public static Properties transformSSLContextService(TlsConfiguration tlsConfiguration) { + final Properties properties = new Properties(); + if (tlsConfiguration.getProtocol() != null) { + properties.setProperty("com.ibm.ssl.protocol", tlsConfiguration.getProtocol()); + } + if (tlsConfiguration.getKeystorePath() != null) { + properties.setProperty("com.ibm.ssl.keyStore", tlsConfiguration.getKeystorePath()); + } + if (tlsConfiguration.getKeystorePassword() != null) { + properties.setProperty("com.ibm.ssl.keyStorePassword", tlsConfiguration.getKeystorePassword()); + } + if (tlsConfiguration.getKeystoreType() != null) { + properties.setProperty("com.ibm.ssl.keyStoreType", tlsConfiguration.getKeystoreType().getType()); + } + if (tlsConfiguration.getTruststorePath() != null) { + properties.setProperty("com.ibm.ssl.trustStore", tlsConfiguration.getTruststorePath()); + } + if (tlsConfiguration.getTruststorePassword() != null) { + properties.setProperty("com.ibm.ssl.trustStorePassword", tlsConfiguration.getTruststorePassword()); + } + if (tlsConfiguration.getTruststoreType() != null) { + properties.setProperty("com.ibm.ssl.trustStoreType", tlsConfiguration.getTruststoreType().getType()); + } + return properties; + } + + private static org.eclipse.paho.client.mqttv3.MqttClient createClient(MqttClientProperties clientProperties, ComponentLog logger) { + logger.debug("Creating Mqtt v3 client"); + + try { + return new org.eclipse.paho.client.mqttv3.MqttClient(clientProperties.getBroker(), clientProperties.getClientId(), new MemoryPersistence()); + } catch (org.eclipse.paho.client.mqttv3.MqttException e) { + throw new MqttException("An error has occurred during creating adapter for MQTT v3 client", e); + } + } + +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java index 13bc624216..8b8c186360 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java @@ -31,25 +31,25 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.TlsException; import org.apache.nifi.ssl.SSLContextService; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Properties; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase; +import static org.apache.commons.lang3.StringUtils.EMPTY; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; @@ -57,62 +57,67 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor { - public static int DISCONNECT_TIMEOUT = 5000; + private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs"; protected ComponentLog logger; - protected IMqttClient mqttClient; - protected volatile String broker; - protected volatile String brokerUri; - protected volatile String clientID; - protected MqttConnectOptions connOpts; - protected MemoryPersistence persistence = new MemoryPersistence(); + + protected MqttClientProperties clientProperties; + + protected MqttClientFactory mqttClientFactory = new MqttClientFactory(); + protected MqttClient mqttClient; public ProcessSessionFactory processSessionFactory; - public static final Validator QOS_VALIDATOR = new Validator() { + public static final Validator QOS_VALIDATOR = (subject, input, context) -> { + Integer inputInt = Integer.parseInt(input); + if (inputInt < 0 || inputInt > 2) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2.").build(); + } + return new ValidationResult.Builder().subject(subject).valid(true).build(); + }; - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - Integer inputInt = Integer.parseInt(input); - if (inputInt < 0 || inputInt > 2) { - return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build(); + public static final Validator BROKER_VALIDATOR = (subject, input, context) -> { + try { + URI brokerURI = new URI(input); + if (!EMPTY.equals(brokerURI.getPath())) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is: " + brokerURI.getPath()).build(); } + if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, brokerURI.getScheme())) { + return new ValidationResult.Builder().subject(subject).valid(false) + .explanation("scheme is invalid. Supported schemes are: " + getSupportedSchemeList()).build(); + } + } catch (URISyntaxException e) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build(); + } + return new ValidationResult.Builder().subject(subject).valid(true).build(); + }; + + private static String getSupportedSchemeList() { + return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new)); + } + + public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> { + if ("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)) { return new ValidationResult.Builder().subject(subject).valid(true).build(); + } else { + return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false) + .validate(subject, input, context); } + }; - public static final Validator BROKER_VALIDATOR = new Validator() { - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - try{ - URI brokerURI = new URI(input); - if (!"".equals(brokerURI.getPath())) { - return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build(); - } - if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || "wss".equals(brokerURI.getScheme()))) { - return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build(); - } - } catch (URISyntaxException e) { - return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build(); - } - return new ValidationResult.Builder().subject(subject).valid(true).build(); - } - }; - - public static final Validator RETAIN_VALIDATOR = new Validator() { - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){ - return new ValidationResult.Builder().subject(subject).valid(true).build(); - } else{ - return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false) - .validate(subject, input, context); - } - - } - }; + public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder() + .name("MQTT Specification Version") + .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.") + .allowableValues( + ALLOWABLE_VALUE_MQTT_VERSION_AUTO, + ALLOWABLE_VALUE_MQTT_VERSION_500, + ALLOWABLE_VALUE_MQTT_VERSION_311, + ALLOWABLE_VALUE_MQTT_VERSION_310 + ) + .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue()) + .required(true) + .build(); public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder() .name("Broker URI") @@ -123,7 +128,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces .addValidator(BROKER_VALIDATOR) .build(); - public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder() .name("Client ID") .description("MQTT client ID to use. If not set, a UUID will be generated.") @@ -155,7 +159,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder() .name("Last Will Topic") .description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.") @@ -174,7 +177,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces .name("Last Will Retain") .description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.") .required(false) - .allowableValues("true","false") + .allowableValues("true", "false") .build(); public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder() @@ -190,7 +193,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder() .name("Session state") - .description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.") + .description("Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.") .required(true) .allowableValues( ALLOWABLE_VALUE_CLEAN_SESSION_TRUE, @@ -199,16 +202,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue()) .build(); - public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder() - .name("MQTT Specification Version") - .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.") - .allowableValues( - ALLOWABLE_VALUE_MQTT_VERSION_AUTO, - ALLOWABLE_VALUE_MQTT_VERSION_311, - ALLOWABLE_VALUE_MQTT_VERSION_310 - ) - .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue()) - .required(true) + public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder() + .name("Session Expiry Interval") + .description("After this interval the broker will expire the client and clear the session state.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500) + .dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE) + .defaultValue(DEFAULT_SESSION_EXPIRY_INTERVAL) .build(); public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder() @@ -232,8 +232,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); - public static List getAbstractPropertyDescriptors(){ - final List descriptors = new ArrayList(); + public static List getAbstractPropertyDescriptors() { + final List descriptors = new ArrayList<>(); descriptors.add(PROP_BROKER_URI); descriptors.add(PROP_CLIENTID); descriptors.add(PROP_USERNAME); @@ -244,6 +244,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces descriptors.add(PROP_LAST_WILL_RETAIN); descriptors.add(PROP_LAST_WILL_QOS); descriptors.add(PROP_CLEAN_SESSION); + descriptors.add(PROP_SESSION_EXPIRY_INTERVAL); descriptors.add(PROP_MQTT_VERSION); descriptors.add(PROP_CONN_TIMEOUT); descriptors.add(PROP_KEEP_ALIVE_INTERVAL); @@ -257,7 +258,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet(); if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) { - results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build()); + results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set.").build()); } final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet(); @@ -269,7 +270,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces // If any of the Last Will Properties are set if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) { // And any are not set - if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){ + if (!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)) { // Then mark as invalid results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " + "set, all must be set.").build()); @@ -289,88 +290,34 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces return results; } - public static Properties transformSSLContextService(SSLContextService sslContextService){ - Properties properties = new Properties(); - if (sslContextService.getSslAlgorithm() != null) { - properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm()); - } - if (sslContextService.getKeyStoreFile() != null) { - properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile()); - } - if (sslContextService.getKeyStorePassword() != null) { - properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword()); - } - if (sslContextService.getKeyStoreType() != null) { - properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType()); - } - if (sslContextService.getTrustStoreFile() != null) { - properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile()); - } - if (sslContextService.getTrustStorePassword() != null) { - properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword()); - } - if (sslContextService.getTrustStoreType() != null) { - properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType()); - } - return properties; + protected void onScheduled(final ProcessContext context) { + clientProperties = getMqttClientProperties(context); } - protected void onScheduled(final ProcessContext context){ - broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue(); - brokerUri = broker.endsWith("/") ? broker : broker + "/"; - clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); + protected void stopClient() { + // Since client is created in the onTrigger method it can happen that it never will be created because of an initialization error. + // We are preventing additional nullPtrException here, but the clean solution would be to create the client in the onScheduled method. + if (mqttClient != null) { + try { + logger.info("Disconnecting client"); + mqttClient.disconnect(); + } catch (Exception e) { + logger.error("Error disconnecting MQTT client", e); + } - if (clientID == null) { - clientID = UUID.randomUUID().toString(); - } + try { + logger.info("Closing client"); + mqttClient.close(); + } catch (Exception e) { + logger.error("Error closing MQTT client", e); + } - connOpts = new MqttConnectOptions(); - connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); - connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger()); - connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger()); - connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger()); - - PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); - if (sslProp.isSet()) { - Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService()); - connOpts.setSSLProperties(sslProps); - } - - PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC); - if (lastWillTopicProp.isSet()){ - String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue(); - PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN); - Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger(); - connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); - } - - - PropertyValue usernameProp = context.getProperty(PROP_USERNAME); - if(usernameProp.isSet()) { - connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue()); - connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray()); - } - } - - protected void onStopped() { - try { - logger.info("Disconnecting client"); - mqttClient.disconnect(DISCONNECT_TIMEOUT); - } catch(MqttException me) { - logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me); - } - - try { - logger.info("Closing client"); - mqttClient.close(); mqttClient = null; - } catch (MqttException me) { - logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me); } } - protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { - return new MqttClient(broker, clientID, persistence); + protected MqttClient createMqttClient() throws TlsException { + return mqttClientFactory.create(clientProperties, getLogger()); } @@ -384,7 +331,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces onTrigger(context, session); session.commitAsync(); } catch (final Throwable t) { - getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + getLogger().error("{} failed to process due to {}; rolling back session", this, t); session.rollback(true); throw t; } @@ -392,8 +339,52 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; - protected boolean isConnected(){ + protected boolean isConnected() { return (mqttClient != null && mqttClient.isConnected()); } + protected MqttClientProperties getMqttClientProperties(final ProcessContext context) { + final MqttClientProperties clientProperties = new MqttClientProperties(); + + try { + clientProperties.setBrokerUri(new URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid Broker URI", e); + } + + String clientId = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); + if (clientId == null) { + clientId = UUID.randomUUID().toString(); + } + clientProperties.setClientId(clientId); + + clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger())); + + clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); + clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS)); + + clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger()); + clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger()); + + final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); + if (sslProp.isSet()) { + final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService(); + clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration()); + } + + clientProperties.setLastWillTopic(context.getProperty(PROP_LAST_WILL_TOPIC).getValue()); + clientProperties.setLastWillMessage(context.getProperty(PROP_LAST_WILL_MESSAGE).getValue()); + final PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN); + clientProperties.setLastWillRetain(lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); + clientProperties.setLastWillQos(context.getProperty(PROP_LAST_WILL_QOS).asInteger()); + + final PropertyValue usernameProp = context.getProperty(PROP_USERNAME); + if (usernameProp.isSet()) { + clientProperties.setUsername(usernameProp.evaluateAttributeExpressions().getValue()); + } + + clientProperties.setPassword(context.getProperty(PROP_PASSWORD).getValue()); + + return clientProperties; + } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java new file mode 100644 index 0000000000..a890616f5c --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +public interface MqttCallback { + void connectionLost(Throwable cause); + void messageArrived(ReceivedMqttMessage message); + void deliveryComplete(String token); +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java new file mode 100644 index 0000000000..f21d3e9242 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +public interface MqttClient { + + /** + * Determines if this client is currently connected to an MQTT broker. + * + * @return whether the client is connected. + */ + boolean isConnected(); + + /** + * Connects the client to an MQTT broker. + */ + void connect(); + + /** + * Disconnects client from an MQTT broker. + */ + void disconnect(); + + /** + * Releases all resource associated with the client. After the client has + * been closed it cannot be reused. For instance attempts to connect will fail. + */ + void close(); + + /** + * Publishes a message to a topic on the MQTT broker. + * + * @param topic to deliver the message to, for example "pipe-1/flow-rate" + * @param message to deliver to the MQTT broker + */ + void publish(String topic, StandardMqttMessage message); + + /** + * Subscribe to a topic. + * + * @param topicFilter the topic to subscribe to, which can include wildcards. + * @param qos the maximum quality of service at which to subscribe. Messages + * published at a lower quality of service will be received at the published + * QoS. Messages published at a higher quality of service will be received using + * the QoS specified on the subscribe. + */ + void subscribe(String topicFilter, int qos); + + /** + * Sets a callback listener to use for events that happen asynchronously. + * + * @param callback for matching events + */ + void setCallback(MqttCallback callback); +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java new file mode 100644 index 0000000000..630385f5e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.mqtt.adapters.HiveMqV5ClientAdapter; +import org.apache.nifi.processors.mqtt.adapters.PahoMqttClientAdapter; +import org.apache.nifi.security.util.TlsException; + +public class MqttClientFactory { + public MqttClient create(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException { + switch (clientProperties.getMqttVersion()) { + case MQTT_VERSION_3_AUTO: + case MQTT_VERSION_3_1: + case MQTT_VERSION_3_1_1: + return new PahoMqttClientAdapter(clientProperties, logger); + case MQTT_VERSION_5_0: + return new HiveMqV5ClientAdapter(clientProperties, logger); + default: + throw new MqttException("Unsupported Mqtt version: " + clientProperties.getMqttVersion()); + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java new file mode 100644 index 0000000000..eecde9b1b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.security.util.TlsConfiguration; + +import java.net.URI; + +public class MqttClientProperties { + private URI brokerUri; + private String clientId; + + private MqttVersion mqttVersion; + + private int keepAliveInterval; + private int connectionTimeout; + + private boolean cleanSession; + private Long sessionExpiryInterval; + + private TlsConfiguration tlsConfiguration; + + private String lastWillTopic; + private String lastWillMessage; + private Boolean lastWillRetain; + private Integer lastWillQos; + + private String username; + private String password; + + public String getBroker() { + return brokerUri.toString(); + } + + public MqttProtocolScheme getScheme() { + return MqttProtocolScheme.valueOf(brokerUri.getScheme().toUpperCase()); + } + + public URI getBrokerUri() { + return brokerUri; + } + + public void setBrokerUri(URI brokerUri) { + this.brokerUri = brokerUri; + } + + public String getClientId() { + return clientId; + } + + public void setClientId(String clientId) { + this.clientId = clientId; + } + + public MqttVersion getMqttVersion() { + return mqttVersion; + } + + public void setMqttVersion(MqttVersion mqttVersion) { + this.mqttVersion = mqttVersion; + } + + public int getKeepAliveInterval() { + return keepAliveInterval; + } + + public void setKeepAliveInterval(int keepAliveInterval) { + this.keepAliveInterval = keepAliveInterval; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public boolean isCleanSession() { + return cleanSession; + } + + public void setCleanSession(boolean cleanSession) { + this.cleanSession = cleanSession; + } + + public Long getSessionExpiryInterval() { + return sessionExpiryInterval; + } + + public void setSessionExpiryInterval(Long sessionExpiryInterval) { + this.sessionExpiryInterval = sessionExpiryInterval; + } + + public TlsConfiguration getTlsConfiguration() { + return tlsConfiguration; + } + + public void setTlsConfiguration(TlsConfiguration tlsConfiguration) { + this.tlsConfiguration = tlsConfiguration; + } + + public String getLastWillTopic() { + return lastWillTopic; + } + + public void setLastWillTopic(String lastWillTopic) { + this.lastWillTopic = lastWillTopic; + } + + public String getLastWillMessage() { + return lastWillMessage; + } + + public void setLastWillMessage(String lastWillMessage) { + this.lastWillMessage = lastWillMessage; + } + + public Boolean getLastWillRetain() { + return lastWillRetain; + } + + public void setLastWillRetain(Boolean lastWillRetain) { + this.lastWillRetain = lastWillRetain; + } + + public Integer getLastWillQos() { + return lastWillQos; + } + + public void setLastWillQos(Integer lastWillQos) { + this.lastWillQos = lastWillQos; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java index a29e6ff616..072708f6da 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java @@ -18,7 +18,11 @@ package org.apache.nifi.processors.mqtt.common; import org.apache.nifi.components.AllowableValue; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1; +import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1_1; +import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_AUTO; +import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_5_0; public class MqttConstants { @@ -66,15 +70,16 @@ public class MqttConstants { ------------------------------------------ */ public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO = - new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT), - "AUTO", + new AllowableValue(String.valueOf(MQTT_VERSION_3_AUTO.getVersionCode()), MQTT_VERSION_3_AUTO.getDisplayName(), "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker"); + public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 = + new AllowableValue(String.valueOf(MQTT_VERSION_5_0.getVersionCode()), MQTT_VERSION_5_0.getDisplayName()); + public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 = - new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1), - "v3.1.1"); + new AllowableValue(String.valueOf(MQTT_VERSION_3_1_1.getVersionCode()), MQTT_VERSION_3_1_1.getDisplayName()); public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 = - new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1), - "v3.1.0"); + new AllowableValue(String.valueOf(MQTT_VERSION_3_1.getVersionCode()), MQTT_VERSION_3_1.getDisplayName()); + } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java new file mode 100644 index 0000000000..cee1599136 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +public class MqttException extends RuntimeException { + + public MqttException(String message) { + super(message); + } + + public MqttException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java new file mode 100644 index 0000000000..1474694223 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +public enum MqttProtocolScheme { + TCP, + SSL, + WS, + WSS +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java new file mode 100644 index 0000000000..0aec86f138 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +public enum MqttVersion { + MQTT_VERSION_3_AUTO(MqttConnectOptions.MQTT_VERSION_DEFAULT, "v3 AUTO"), + MQTT_VERSION_3_1(MqttConnectOptions.MQTT_VERSION_3_1, "v3.1.0"), + MQTT_VERSION_3_1_1(MqttConnectOptions.MQTT_VERSION_3_1_1, "v3.1.1"), + MQTT_VERSION_5_0(5, "v5.0"); + + private final int versionCode; + private final String displayName; + + MqttVersion(int versionCode, String displayName) { + this.versionCode = versionCode; + this.displayName = displayName; + } + + public int getVersionCode() { + return versionCode; + } + + public String getDisplayName() { + return displayName; + } + + public static MqttVersion fromVersionCode(int versionCode) { + for (MqttVersion version : values()) { + if (version.getVersionCode() == versionCode) { + return version; + } + } + throw new IllegalArgumentException("Unable to map MqttVersionCode from version code: " + versionCode); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java new file mode 100644 index 0000000000..abe1fe6281 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mqtt.common; + +/** + * Represents a received MQTT message + */ +public class ReceivedMqttMessage extends StandardMqttMessage { + + private String topic; + + public ReceivedMqttMessage(byte[] payload, int qos, boolean retained, String topic) { + super(payload, qos, retained); + this.topic = topic; + } + + public String getTopic() { + return topic; + } + + public boolean isDuplicate() { + return false; + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java similarity index 64% rename from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java rename to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java index d5e63c789b..2a232eab86 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java @@ -14,29 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.nifi.processors.mqtt.common; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -public class MQTTQueueMessage { - private String topic; - +/** + * Represents a MQTT message. + */ +public class StandardMqttMessage { private byte[] payload; - private int qos = 1; - private boolean retained = false; - private boolean duplicate = false; + private int qos; + private boolean retained; - public MQTTQueueMessage(String topic, MqttMessage message) { - this.topic = topic; - payload = message.getPayload(); - qos = message.getQos(); - retained = message.isRetained(); - duplicate = message.isDuplicate(); - } - - public String getTopic() { - return topic; + public StandardMqttMessage(byte[] payload, int qos, boolean retained) { + this.payload = payload; + this.qos = qos; + this.retained = retained; } public byte[] getPayload() { @@ -50,8 +41,4 @@ public class MQTTQueueMessage { public boolean isRetained() { return retained; } - - public boolean isDuplicate() { - return duplicate; - } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java index f66b4dd402..81ea6d7db1 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java @@ -18,8 +18,10 @@ package org.apache.nifi.processors.mqtt; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.processors.mqtt.common.MqttClient; import org.apache.nifi.processors.mqtt.common.MqttTestClient; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.security.util.SslContextFactory; @@ -29,10 +31,6 @@ import org.apache.nifi.security.util.TlsException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -59,8 +57,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon { } @Override - public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { - mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber); + protected MqttClient createMqttClient() { + mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber); return mqttTestClient; } } @@ -111,10 +109,10 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon { public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException { testRunner.run(1, false); ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor(); - MQTTQueueMessage mock = mock(MQTTQueueMessage.class); + ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class); when(mock.getPayload()).thenReturn(new byte[0]); when(mock.getTopic()).thenReturn("testTopic"); - BlockingQueue mqttQueue = getMqttQueue(processor); + BlockingQueue mqttQueue = getMqttQueue(processor); mqttQueue.add(mock); ProcessSession session = testRunner.getProcessSessionFactory().createSession(); @@ -131,11 +129,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon { } @Override - public void internalPublish(final MqttMessage message, final String topicName) { - try { - mqttTestClient.publish(topicName, message); - } catch (MqttException e) { - throw new RuntimeException(e); - } + public void internalPublish(final StandardMqttMessage message, final String topicName) { + mqttTestClient.publish(topicName, message); } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java index 1755365c1c..41181c65ed 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java @@ -17,13 +17,12 @@ package org.apache.nifi.processors.mqtt; -import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.processors.mqtt.common.MqttClient; +import org.apache.nifi.processors.mqtt.common.MqttException; import org.apache.nifi.processors.mqtt.common.MqttTestClient; +import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; import org.apache.nifi.util.TestRunners; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.BeforeEach; import java.util.Arrays; @@ -34,11 +33,12 @@ public class TestPublishMQTT extends TestPublishMqttCommon { @Override public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { - MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage; - assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload())); - assertEquals(qos, mqttQueueMessage.getQos()); - assertEquals(retain, mqttQueueMessage.isRetained()); - assertEquals(topic, mqttQueueMessage.getTopic()); + StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage(); + String lastPublishedTopic = mqttTestClient.getLastPublishedTopic(); + assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload())); + assertEquals(qos, lastPublishedMessage.getQos()); + assertEquals(retain, lastPublishedMessage.isRetained()); + assertEquals(topic, lastPublishedTopic); } private MqttTestClient mqttTestClient; @@ -50,8 +50,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon { } @Override - public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { - mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher); + protected MqttClient createMqttClient() throws MqttException { + mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher); return mqttTestClient; } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java index b135dcfe44..91997061bc 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java @@ -17,24 +17,9 @@ package org.apache.nifi.processors.mqtt.common; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.IMqttMessageListener; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttPersistenceException; -import org.eclipse.paho.client.mqttv3.MqttSecurityException; -import org.eclipse.paho.client.mqttv3.MqttTopic; - import java.util.concurrent.atomic.AtomicBoolean; -public class MqttTestClient implements IMqttClient { - - public String serverURI; - public String clientId; +public class MqttTestClient implements MqttClient { public AtomicBoolean connected = new AtomicBoolean(false); @@ -42,233 +27,66 @@ public class MqttTestClient implements IMqttClient { public ConnectType type; public enum ConnectType {Publisher, Subscriber} - public MQTTQueueMessage publishedMessage; + private StandardMqttMessage lastPublishedMessage; + private String lastPublishedTopic; public String subscribedTopic; public int subscribedQos; - public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException { - this.serverURI = serverURI; - this.clientId = clientId; + public MqttTestClient(ConnectType type) { this.type = type; } - @Override - public void connect() throws MqttSecurityException, MqttException { - connected.set(true); - } - - @Override - public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException { - connected.set(true); - } - - @Override - public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException { - return null; - } - - @Override - public void disconnect() throws MqttException { - connected.set(false); - } - - @Override - public void disconnect(long quiesceTimeout) throws MqttException { - connected.set(false); - } - - @Override - public void disconnectForcibly() throws MqttException { - connected.set(false); - } - - @Override - public void disconnectForcibly(long disconnectTimeout) throws MqttException { - connected.set(false); - } - - @Override - public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException { - connected.set(false); - } - - @Override - public void subscribe(String topicFilter) throws MqttException, MqttSecurityException { - subscribedTopic = topicFilter; - subscribedQos = -1; - } - - @Override - public void subscribe(String[] topicFilters) throws MqttException { - throw new UnsupportedOperationException("Multiple topic filters is not supported"); - } - - @Override - public void subscribe(String topicFilter, int qos) throws MqttException { - subscribedTopic = topicFilter; - subscribedQos = qos; - } - - @Override - public void subscribe(String[] topicFilters, int[] qos) throws MqttException { - throw new UnsupportedOperationException("Multiple topic filters is not supported"); - } - - @Override - public void subscribe(String s, IMqttMessageListener iMqttMessageListener) throws MqttException, MqttSecurityException { - - } - - @Override - public void subscribe(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException { - - } - - @Override - public void subscribe(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException { - - } - - @Override - public void subscribe(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException { - - } - - @Override - public IMqttToken subscribeWithResponse(String s) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String s, IMqttMessageListener iMqttMessageListener) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String s, int i) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String[] strings) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String[] strings, int[] ints) throws MqttException { - return null; - } - - @Override - public IMqttToken subscribeWithResponse(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException { - return null; - } - - @Override - public void unsubscribe(String topicFilter) throws MqttException { - subscribedTopic = ""; - subscribedQos = -2; - } - - @Override - public void unsubscribe(String[] topicFilters) throws MqttException { - throw new UnsupportedOperationException("Multiple topic filters is not supported"); - } - - @Override - public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { - MqttMessage message = new MqttMessage(payload); - message.setQos(qos); - message.setRetained(retained); - switch (type) { - case Publisher: - publishedMessage = new MQTTQueueMessage(topic, message); - break; - case Subscriber: - try { - mqttCallback.messageArrived(topic, message); - } catch (Exception e) { - throw new MqttException(e); - } - break; - } - } - - @Override - public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { - switch (type) { - case Publisher: - publishedMessage = new MQTTQueueMessage(topic, message); - break; - case Subscriber: - try { - mqttCallback.messageArrived(topic, message); - } catch (Exception e) { - throw new MqttException(e); - } - break; - } - } - - @Override - public void setCallback(MqttCallback callback) { - this.mqttCallback = callback; - } - - @Override - public MqttTopic getTopic(String topic) { - return null; - } - @Override public boolean isConnected() { return connected.get(); } @Override - public String getClientId() { - return clientId; + public void connect() { + connected.set(true); } @Override - public String getServerURI() { - return serverURI; + public void disconnect() { + connected.set(false); } @Override - public IMqttDeliveryToken[] getPendingDeliveryTokens() { - return new IMqttDeliveryToken[0]; - } - - @Override - public void setManualAcks(boolean b) { + public void close() { } @Override - public void reconnect() throws MqttException { - + public void publish(String topic, StandardMqttMessage message) { + switch (type) { + case Publisher: + lastPublishedMessage = message; + lastPublishedTopic = topic; + break; + case Subscriber: + mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic)); + break; + } } @Override - public void messageArrivedComplete(int i, int i1) throws MqttException { - + public void subscribe(String topicFilter, int qos) { + subscribedTopic = topicFilter; + subscribedQos = qos; } @Override - public void close() throws MqttException { + public void setCallback(MqttCallback callback) { + this.mqttCallback = callback; + } + public StandardMqttMessage getLastPublishedMessage() { + return lastPublishedMessage; + } + + public String getLastPublishedTopic() { + return lastPublishedTopic; } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java index 0eec42ce71..7cbaa11c8d 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -28,8 +28,6 @@ import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.eclipse.paho.client.mqttv3.IMqttClient; -import org.eclipse.paho.client.mqttv3.MqttMessage; import org.junit.jupiter.api.Test; import java.lang.reflect.Field; @@ -66,7 +64,7 @@ public abstract class TestConsumeMqttCommon { private static final int LEAST_ONE = 1; private static final int EXACTLY_ONCE = 2; - public abstract void internalPublish(MqttMessage message, String topicName); + public abstract void internalPublish(StandardMqttMessage message, String topicName); @Test public void testClientIDConfiguration() { @@ -295,10 +293,8 @@ public abstract class TestConsumeMqttCommon { testRunner.assertValid(); - MqttMessage innerMessage = new MqttMessage(); - innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array()); - innerMessage.setQos(2); - MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage); + final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array(); + ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic"); ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); consumeMQTT.onScheduled(testRunner.getProcessContext()); @@ -313,7 +309,7 @@ public abstract class TestConsumeMqttCommon { Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue"); f.setAccessible(true); @SuppressWarnings("unchecked") - LinkedBlockingQueue queue = (LinkedBlockingQueue) f.get(consumeMQTT); + LinkedBlockingQueue queue = (LinkedBlockingQueue) f.get(consumeMQTT); queue.add(testMessage); consumeMQTT.onUnscheduled(testRunner.getProcessContext()); @@ -551,7 +547,7 @@ public abstract class TestConsumeMqttCommon { private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); f.setAccessible(true); - IMqttClient mqttClient = (IMqttClient) f.get(processor); + MqttClient mqttClient = (MqttClient) f.get(processor); return mqttClient.isConnected(); } @@ -563,10 +559,10 @@ public abstract class TestConsumeMqttCommon { } @SuppressWarnings("unchecked") - public static BlockingQueue getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException { + public static BlockingQueue getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException { Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue"); mqttQueueField.setAccessible(true); - return (BlockingQueue) mqttQueueField.get(consumeMQTT); + return (BlockingQueue) mqttQueueField.get(consumeMQTT); } public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { @@ -585,11 +581,7 @@ public abstract class TestConsumeMqttCommon { } private void publishMessage(final String payload, final int qos) { - final MqttMessage message = new MqttMessage(); - message.setPayload(payload.getBytes(StandardCharsets.UTF_8)); - message.setQos(qos); - message.setRetained(false); - + final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false); internalPublish(message, "testTopic"); } }