NIFI-10251 Add v5 protocol support for existing MQTT processors

This closes #6225.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-07-07 17:54:11 +02:00 committed by Peter Turcsanyi
parent c4843fa3a8
commit 4d4a5ca4be
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
22 changed files with 1765 additions and 584 deletions

View File

@ -1565,6 +1565,275 @@ The following binary components are provided under the Apache Software License v
* http://tomcat.apache.org/native-doc/ * http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/ * https://svn.apache.org/repos/asf/tomcat/native/
(ASLv2) The Netty Project (4.1.77.Final)
The following NOTICE information applies:
netty/netty
Copyright 2014 The Netty Project
-------------------------------------------------------------------------------
The Netty Project
=================
Please visit the Netty web site for more information:
* https://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* https://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* NOTICE:
* license/NOTICE.harmony.txt
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* https://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product optionally depends on 'zstd-jni', a zstd-jni Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.zstd-jni.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/luben/zstd-jni
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* https://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jboss-remoting/jboss-marshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* https://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
* LICENSE:
* license/LICENSE.hyper-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/python-hyper/hpack/
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
* LICENSE:
* license/LICENSE.nghttp2-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/nghttp2/nghttp2/
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
* LICENSE:
* license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/takari/maven-wrapper
This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
This private header is also used by Apple's open source
mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
* LICENSE:
* license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
* HOMEPAGE:
* https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
This product optionally depends on 'Brotli4j', Brotli compression and
decompression for Java., which can be obtained at:
* LICENSE:
* license/LICENSE.brotli4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/hyperxpro/Brotli4j
(ASLv2) Error Prone (ASLv2) Error Prone
The following NOTICE information applies: The following NOTICE information applies:
Copyright 2017 Google Inc. Copyright 2017 Google Inc.
@ -1958,6 +2227,36 @@ The following binary components are provided under the Apache Software License v
Copyright (c) 2014-2019 Appsicle Copyright (c) 2014-2019 Appsicle
Copyright (c) 2019-2020 QuestDB Copyright (c) 2019-2020 QuestDB
(ASLv2) HiveMQ MQTT Client
The following NOTICE information applies:
HiveMQ MQTT Client
Copyright 2018-2022 HiveMQ and the HiveMQ Community
(ASLv2) ReactiveX/RxJava
The following NOTICE information applies:
ReactiveX/RxJava
Copyright 2018-present RxJava Contributors
(ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
The following NOTICE information applies:
JCTools/JCTools
Copyright
(ASLv2) JetBrains/java-annotations
The following NOTICE information applies:
JetBrains/java-annotations
Copyright 2000-2016 JetBrains s.r.o.
(ASLv2) google/dagger
The following NOTICE information applies:
google/dagger
Copyright 2012 The Dagger Authors
(ASLv2) atinject (javax.inject:javax.inject)
The following NOTICE information applies:
atinject
Copyright
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
************************ ************************

View File

@ -41,6 +41,305 @@ The following binary components are provided under the Apache Software License v
in some artifacts (usually source distributions); but is always available in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses. from the source code management (SCM) system project uses.
(ASLv2) HiveMQ MQTT Client
The following NOTICE information applies:
HiveMQ MQTT Client
Copyright 2018-2022 HiveMQ and the HiveMQ Community
(ASLv2) ReactiveX/RxJava
The following NOTICE information applies:
ReactiveX/RxJava
Copyright 2018-present RxJava Contributors
(ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
The following NOTICE information applies:
JCTools/JCTools
Copyright
(ASLv2) JetBrains/java-annotations
The following NOTICE information applies:
JetBrains/java-annotations
Copyright 2000-2016 JetBrains s.r.o.
(ASLv2) google/dagger
The following NOTICE information applies:
google/dagger
Copyright 2012 The Dagger Authors
(ASLv2) atinject (javax.inject:javax.inject)
The following NOTICE information applies:
atinject
Copyright
(ASLv2) The Netty Project
The following NOTICE information applies:
netty/netty
Copyright 2014 The Netty Project
-------------------------------------------------------------------------------
The Netty Project
=================
Please visit the Netty web site for more information:
* https://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* https://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* NOTICE:
* license/NOTICE.harmony.txt
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* https://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product optionally depends on 'zstd-jni', a zstd-jni Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.zstd-jni.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/luben/zstd-jni
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* https://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jboss-remoting/jboss-marshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* https://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
* LICENSE:
* license/LICENSE.hyper-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/python-hyper/hpack/
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
* LICENSE:
* license/LICENSE.nghttp2-hpack.txt (MIT License)
* HOMEPAGE:
* https://github.com/nghttp2/nghttp2/
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
* LICENSE:
* license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/takari/maven-wrapper
This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
This private header is also used by Apple's open source
mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
* LICENSE:
* license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
* HOMEPAGE:
* https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
This product optionally depends on 'Brotli4j', Brotli compression and
decompression for Java., which can be obtained at:
* LICENSE:
* license/LICENSE.brotli4j.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/hyperxpro/Brotli4j
************************ ************************
Eclipse Public License 1.0 Eclipse Public License 1.0
************************ ************************
@ -48,3 +347,11 @@ Eclipse Public License 1.0
The following binary components are provided under the Eclipse Public License 1.0. See project link for details. The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0 - https://github.com/eclipse/paho.mqtt.java) (EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0 - https://github.com/eclipse/paho.mqtt.java)
*****************
Public Domain
*****************
The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
(CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/)

View File

@ -48,6 +48,11 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId> <artifactId>nifi-record</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
<!-- External dependencies --> <!-- External dependencies -->
<dependency> <dependency>
@ -55,6 +60,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version> <version>1.2.2</version>
</dependency> </dependency>
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.0</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
@ -84,23 +94,4 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/integration/TestConsumeMQTT.java</exclude>
<exclude>**/integration/TestConsumeMqttSSL.java</exclude>
<exclude>**/integration/TestPublishAndSubscribeMqttIntegration.java</exclude>
<exclude>**/integration/TestPublishMQTT.java</exclude>
<exclude>**/integration/TestPublishMqttSSL.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -42,10 +42,11 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordReaderFactory;
@ -58,10 +59,6 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
@ -209,7 +206,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private volatile String topicFilter; private volatile String topicFilter;
private final AtomicBoolean scheduled = new AtomicBoolean(false); private final AtomicBoolean scheduled = new AtomicBoolean(false);
private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue; private volatile LinkedBlockingQueue<ReceivedMqttMessage> mqttQueue;
public static final Relationship REL_MESSAGE = new Relationship.Builder() public static final Relationship REL_MESSAGE = new Relationship.Builder()
.name("Message") .name("Message")
@ -226,7 +223,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private static final List<PropertyDescriptor> descriptors; private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
static{ static {
final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors(); final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
innerDescriptorsList.add(PROP_GROUPID); innerDescriptorsList.add(PROP_GROUPID);
innerDescriptorsList.add(PROP_TOPIC_FILTER); innerDescriptorsList.add(PROP_TOPIC_FILTER);
@ -238,7 +235,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(MESSAGE_DEMARCATOR); innerDescriptorsList.add(MESSAGE_DEMARCATOR);
descriptors = Collections.unmodifiableList(innerDescriptorsList); descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>(); final Set<Relationship> innerRelationshipsSet = new HashSet<>();
innerRelationshipsSet.add(REL_MESSAGE); innerRelationshipsSet.add(REL_MESSAGE);
innerRelationshipsSet.add(REL_PARSE_FAILURE); innerRelationshipsSet.add(REL_PARSE_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet); relationships = Collections.unmodifiableSet(innerRelationshipsSet);
@ -249,15 +246,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// resize the receive buffer, but preserve data // resize the receive buffer, but preserve data
if (descriptor == PROP_MAX_QUEUE_SIZE) { if (descriptor == PROP_MAX_QUEUE_SIZE) {
// it's a mandatory integer, never null // it's a mandatory integer, never null
int newSize = Integer.valueOf(newValue); int newSize = Integer.parseInt(newValue);
if (mqttQueue != null) { if (mqttQueue != null) {
int msgPending = mqttQueue.size(); int msgPending = mqttQueue.size();
if (msgPending > newSize) { if (msgPending > newSize) {
logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", newSize, msgPending);
new Object[]{newSize, msgPending});
return; return;
} }
LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize); LinkedBlockingQueue<ReceivedMqttMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
mqttQueue.drainTo(newBuffer); mqttQueue.drainTo(newBuffer);
mqttQueue = newBuffer; mqttQueue = newBuffer;
} }
@ -297,15 +293,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet(); final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet(); final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) { if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("Both Record Reader and Writer must be set when used").build()); .explanation("both Record Reader and Writer must be set when used.").build());
} }
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet(); final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
if(readerIsSet && demarcatorIsSet) { if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false) results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("You cannot use both a demarcator and a Reader/Writer").build()); .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
} }
return results; return results;
@ -346,17 +342,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public void onUnscheduled(final ProcessContext context) { public void onUnscheduled(final ProcessContext context) {
scheduled.set(false); scheduled.set(false);
synchronized (this) { synchronized (this) {
super.onStopped(); stopClient();
} }
} }
@OnStopped @OnStopped
public void onStopped(final ProcessContext context) throws IOException { public void onStopped(final ProcessContext context) {
if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
logger.info("Finishing processing leftover messages"); logger.info("Finishing processing leftover messages");
ProcessSession session = processSessionFactory.createSession(); ProcessSession session = processSessionFactory.createSession();
if(context.getProperty(RECORD_READER).isSet()) { if (context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session); transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session); transferQueueDemarcator(context, session);
@ -364,7 +360,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
transferQueue(session); transferQueue(session);
} }
} else { } else {
if (mqttQueue!= null && !mqttQueue.isEmpty()){ if (mqttQueue != null && !mqttQueue.isEmpty()) {
throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " + throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " +
"clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " + "clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " +
"in the MQTT internal queue cannot finish processing until until the processor is triggered to run."); "in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
@ -375,7 +371,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final boolean isScheduled = scheduled.get(); final boolean isScheduled = scheduled.get();
if (!isConnected() && isScheduled){ if (!isConnected() && isScheduled) {
synchronized (this) { synchronized (this) {
if (!isConnected()) { if (!isConnected()) {
initializeClient(context); initializeClient(context);
@ -402,42 +398,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// non-null but not connected, so we need to handle each case and only create a new client when it is null // non-null but not connected, so we need to handle each case and only create a new client when it is null
try { try {
if (mqttClient == null) { if (mqttClient == null) {
logger.debug("Creating client"); mqttClient = createMqttClient();
mqttClient = createMqttClient(broker, clientID, persistence);
mqttClient.setCallback(this); mqttClient.setCallback(this);
} }
if (!mqttClient.isConnected()) { if (!mqttClient.isConnected()) {
logger.debug("Connecting client"); mqttClient.connect();
mqttClient.connect(connOpts);
mqttClient.subscribe(topicPrefix + topicFilter, qos); mqttClient.subscribe(topicPrefix + topicFilter, qos);
} }
} catch (MqttException e) { } catch (Exception e) {
logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{clientProperties.getBroker()}, e);
mqttClient = null; // prevent stucked processor when subscribe fails
context.yield(); context.yield();
} }
} }
private void transferQueue(ProcessSession session){ private void transferQueue(ProcessSession session) {
while (!mqttQueue.isEmpty()) { while (!mqttQueue.isEmpty()) {
final MQTTQueueMessage mqttMessage = mqttQueue.peek(); final ReceivedMqttMessage mqttMessage = mqttQueue.peek();
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>(); final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
attrs.put(BROKER_ATTRIBUTE_KEY, broker); out -> out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()));
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
}
});
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_MESSAGE); session.transfer(messageFlowfile, REL_MESSAGE);
@ -446,17 +427,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
} }
} }
private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){ private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session) {
final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8); final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
FlowFile messageFlowfile = session.create(); FlowFile messageFlowfile = session.create();
session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker); session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
messageFlowfile = session.append(messageFlowfile, out -> { messageFlowfile = session.append(messageFlowfile, out -> {
int i = 0; int i = 0;
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
final MQTTQueueMessage mqttMessage = mqttQueue.poll(); final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()); out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
out.write(demarcator); out.write(demarcator);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false); session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
@ -469,41 +449,40 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
session.commitAsync(); session.commitAsync();
} }
private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) { private void transferFailure(final ProcessSession session, final ReceivedMqttMessage mqttMessage) {
FlowFile messageFlowfile = session.create(); final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
out -> out.write(mqttMessage.getPayload()));
Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, broker);
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(mqttMessage.getPayload());
}
});
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic())); session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_PARSE_FAILURE); session.transfer(messageFlowfile, REL_PARSE_FAILURE);
session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false); session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
} }
private void transferQueueRecord(final ProcessContext context, final ProcessSession session){ private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
return messageFlowfile;
}
private void transferQueueRecord(final ProcessContext context, final ProcessSession session) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker); session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger(); final AtomicInteger recordCount = new AtomicInteger();
final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>(); final List<ReceivedMqttMessage> doneList = new ArrayList<>();
RecordSetWriter writer = null; RecordSetWriter writer = null;
boolean isWriterInitialized = false; boolean isWriterInitialized = false;
@ -511,8 +490,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
try { try {
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) { while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
final MQTTQueueMessage mqttMessage = mqttQueue.poll(); final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
if(mqttMessage == null) { if (mqttMessage == null) {
break; break;
} }
@ -533,16 +512,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
Record record; Record record;
while ((record = reader.nextRecord()) != null) { while ((record = reader.nextRecord()) != null) {
if(!isWriterInitialized) { if (!isWriterInitialized) {
final RecordSchema recordSchema = record.getSchema(); final RecordSchema recordSchema = record.getSchema();
final OutputStream rawOut = session.write(flowFile); final OutputStream rawOut = session.write(flowFile);
RecordSchema writeSchema; RecordSchema writeSchema;
try { try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>(writeSchema.getFields());
fields.addAll(writeSchema.getFields());
fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType())); fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType())); fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
@ -562,7 +540,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
} }
try { try {
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) { if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic()); record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
record.setValue(QOS_FIELD_KEY, mqttMessage.getQos()); record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained()); record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
@ -583,16 +561,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
} catch (final IOException | MalformedRecordException | SchemaValidationException e) { } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
logger.error("Failed to write message, sending to the parse failure relationship", e); logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage); transferFailure(session, mqttMessage);
continue;
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("Failed to write message, sending to the parse failure relationship", e); logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage); transferFailure(session, mqttMessage);
continue;
} }
} }
if(writer != null) { if (writer != null) {
final WriteResult writeResult = writer.finishRecordSet(); final WriteResult writeResult = writer.finishRecordSet();
attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount())); attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@ -605,26 +581,26 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// we try to add the messages back into the internal queue // we try to add the messages back into the internal queue
int numberOfMessages = 0; int numberOfMessages = 0;
for(MQTTQueueMessage done : doneList) { for (ReceivedMqttMessage done : doneList) {
try { try {
mqttQueue.offer(done, 1, TimeUnit.SECONDS); mqttQueue.offer(done, 1, TimeUnit.SECONDS);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
numberOfMessages++; numberOfMessages++;
if(getLogger().isDebugEnabled()) { if (getLogger().isDebugEnabled()) {
logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex); logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
} }
} }
} }
if(numberOfMessages > 0) { if (numberOfMessages > 0) {
logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages}); logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", numberOfMessages);
} }
throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e); throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getBroker(), e);
} finally { } finally {
closeWriter(writer); closeWriter(writer);
} }
if(recordCount.get() == 0) { if (recordCount.get() == 0) {
session.remove(flowFile); session.remove(flowFile);
return; return;
} }
@ -635,7 +611,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final int count = recordCount.get(); final int count = recordCount.get();
session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false); session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile}); getLogger().info("Successfully processed {} records for {}", count, flowFile);
} }
private void closeWriter(final RecordSetWriter writer) { private void closeWriter(final RecordSetWriter writer) {
@ -649,8 +625,9 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
} }
private String getTransitUri(String... appends) { private String getTransitUri(String... appends) {
StringBuilder stringBuilder = new StringBuilder(brokerUri); String broker = clientProperties.getBrokerUri().toString();
for(String append : appends) { StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? broker : broker + "/");
for (String append : appends) {
stringBuilder.append(append); stringBuilder.append(append);
} }
return stringBuilder.toString(); return stringBuilder.toString();
@ -658,29 +635,34 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
} }
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(ReceivedMqttMessage message) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload(); byte[] payload = message.getPayload();
String text = new String(payload, "UTF-8"); String text = new String(payload, StandardCharsets.UTF_8);
if (StringUtils.isAsciiPrintable(text)) { if (StringUtils.isAsciiPrintable(text)) {
logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text}); logger.debug("Message arrived from topic {}. Payload: {}", message.getTopic(), text);
} else { } else {
logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length}); logger.debug("Message arrived from topic {}. Binary value of size {}", message.getTopic(), payload.length);
} }
} }
if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) { try {
throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
}
} catch (InterruptedException e) {
throw new MqttException("Failed to process message arrived from topic " + message.getTopic());
} }
} }
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(String token) {
logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token); // Unlikely situation. Api uses the same callback for publisher and consumer as well.
// That's why we have this log message here to indicate something really messy thing happened.
logger.error("Received MQTT 'delivery complete' message to subscriber. Token: [{}]", token);
} }
} }

View File

@ -36,18 +36,15 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
@ -137,18 +134,18 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
@OnStopped @OnStopped
public void onStopped(final ProcessContext context) { public void onStopped(final ProcessContext context) {
synchronized (this) { synchronized (this) {
super.onStopped(); stopClient();
} }
} }
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowfile = session.get(); final FlowFile flowfile = session.get();
if (flowfile == null) { if (flowfile == null) {
return; return;
} }
if (!isConnected()){ if (!isConnected()) {
synchronized (this) { synchronized (this) {
if (!isConnected()) { if (!isConnected()) {
initializeClient(context); initializeClient(context);
@ -157,7 +154,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
} }
// get the MQTT topic // get the MQTT topic
String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue(); final String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
if (topic == null || topic.isEmpty()) { if (topic == null || topic.isEmpty()) {
logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure"); logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure");
@ -167,18 +164,11 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
// do the read // do the read
final byte[] messageContent = new byte[(int) flowfile.getSize()]; final byte[] messageContent = new byte[(int) flowfile.getSize()];
session.read(flowfile, new InputStreamCallback() { session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger(); int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
final MqttMessage mqttMessage = new MqttMessage(messageContent); boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
mqttMessage.setQos(qos); final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained);
mqttMessage.setPayload(messageContent);
mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
try { try {
final StopWatch stopWatch = new StopWatch(true); final StopWatch stopWatch = new StopWatch(true);
@ -188,9 +178,9 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
*/ */
mqttClient.publish(topic, mqttMessage); mqttClient.publish(topic, mqttMessage);
session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS); session.transfer(flowfile, REL_SUCCESS);
} catch(MqttException me) { } catch (MqttException me) {
logger.error("Failed to publish message.", me); logger.error("Failed to publish message.", me);
session.transfer(flowfile, REL_FAILURE); session.transfer(flowfile, REL_FAILURE);
} }
@ -201,35 +191,35 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
// non-null but not connected, so we need to handle each case and only create a new client when it is null // non-null but not connected, so we need to handle each case and only create a new client when it is null
try { try {
if (mqttClient == null) { if (mqttClient == null) {
logger.debug("Creating client"); mqttClient = createMqttClient();
mqttClient = createMqttClient(broker, clientID, persistence);
mqttClient.setCallback(this); mqttClient.setCallback(this);
} }
if (!mqttClient.isConnected()) { if (!mqttClient.isConnected()) {
logger.debug("Connecting client"); mqttClient.connect();
mqttClient.connect(connOpts);
} }
} catch (MqttException e) { } catch (Exception e) {
logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", clientProperties.getBroker(), e);
context.yield(); context.yield();
} }
} }
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
} }
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(ReceivedMqttMessage message) {
logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}"); // Unlikely situation. Api uses the same callback for publisher and consumer as well.
// That's why we have this log message here to indicate something really messy thing happened.
logger.error("Message arrived to a PublishMQTT processor { topic:'" + message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + "}");
} }
@Override @Override
public void deliveryComplete(IMqttDeliveryToken token) { public void deliveryComplete(String token) {
// Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
logger.trace("Received 'delivery complete' message from broker for:" + token.toString()); logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
} }
} }

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.adapters;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL;
import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS;
import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS;
public class HiveMqV5ClientAdapter implements MqttClient {
private final Mqtt5BlockingClient mqtt5BlockingClient;
private final MqttClientProperties clientProperties;
private final ComponentLog logger;
private MqttCallback callback;
public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
this.mqtt5BlockingClient = createClient(clientProperties, logger);
this.clientProperties = clientProperties;
this.logger = logger;
}
@Override
public boolean isConnected() {
return mqtt5BlockingClient.getState().isConnected();
}
@Override
public void connect() {
logger.debug("Connecting to broker");
final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
.keepAlive(clientProperties.getKeepAliveInterval());
final boolean cleanSession = clientProperties.isCleanSession();
connectBuilder.cleanStart(cleanSession);
if (!cleanSession) {
connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval());
}
final String lastWillTopic = clientProperties.getLastWillTopic();
if (lastWillTopic != null) {
connectBuilder.willPublish()
.topic(lastWillTopic)
.payload(clientProperties.getLastWillMessage().getBytes())
.retain(clientProperties.getLastWillRetain())
.qos(MqttQos.fromCode(clientProperties.getLastWillQos()))
.applyWillPublish();
}
final String username = clientProperties.getUsername();
final String password = clientProperties.getPassword();
if (username != null && password != null) {
connectBuilder.simpleAuth()
.username(clientProperties.getUsername())
.password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth();
}
final Mqtt5Connect mqtt5Connect = connectBuilder.build();
mqtt5BlockingClient.connect(mqtt5Connect);
}
@Override
public void disconnect() {
logger.debug("Disconnecting client");
// Currently it is not possible to set timeout for disconnect with HiveMQ Client.
mqtt5BlockingClient.disconnect();
}
@Override
public void close() {
// there is no paho's close equivalent in hivemq client
}
@Override
public void publish(String topic, StandardMqttMessage message) {
logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
mqtt5BlockingClient.publishWith()
.topic(topic)
.payload(message.getPayload())
.retain(message.isRetained())
.qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
.send();
}
@Override
public void subscribe(String topicFilter, int qos) {
Objects.requireNonNull(callback, "callback should be set");
logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
CompletableFuture<Mqtt5SubAck> futureAck = mqtt5BlockingClient.toAsync().subscribeWith()
.topicFilter(topicFilter)
.qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
.callback(mqtt5Publish -> {
final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(
mqtt5Publish.getPayloadAsBytes(),
mqtt5Publish.getQos().getCode(),
mqtt5Publish.isRetain(),
mqtt5Publish.getTopic().toString());
callback.messageArrived(receivedMessage);
})
.send();
// Setting "listener" callback is only possible with async client, though sending subscribe message
// should happen in a blocking way to make sure the processor is blocked until ack is not arrived.
try {
Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
logger.debug("Received mqtt5 subscribe ack: {}", ack);
} catch (Exception e) {
throw new MqttException("An error has occurred during sending subscribe message to broker", e);
}
}
@Override
public void setCallback(MqttCallback callback) {
this.callback = callback;
}
private static Mqtt5BlockingClient createClient(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
logger.debug("Creating Mqtt v5 client");
Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
.identifier(clientProperties.getClientId())
.serverHost(clientProperties.getBrokerUri().getHost());
int port = clientProperties.getBrokerUri().getPort();
if (port != -1) {
mqtt5ClientBuilder.serverPort(port);
}
// default is tcp
if (WS.equals(clientProperties.getScheme()) || WSS.equals(clientProperties.getScheme())) {
mqtt5ClientBuilder.webSocketConfig().applyWebSocketConfig();
}
if (SSL.equals(clientProperties.getScheme())) {
if (clientProperties.getTlsConfiguration().getTruststorePath() != null) {
mqtt5ClientBuilder
.sslConfig()
.trustManagerFactory(KeyStoreUtils.loadTrustManagerFactory(
clientProperties.getTlsConfiguration().getTruststorePath(),
clientProperties.getTlsConfiguration().getTruststorePassword(),
clientProperties.getTlsConfiguration().getTruststoreType().getType()))
.applySslConfig();
}
if (clientProperties.getTlsConfiguration().getKeystorePath() != null) {
mqtt5ClientBuilder
.sslConfig()
.keyManagerFactory(KeyStoreUtils.loadKeyManagerFactory(
clientProperties.getTlsConfiguration().getKeystorePath(),
clientProperties.getTlsConfiguration().getKeystorePassword(),
null,
clientProperties.getTlsConfiguration().getKeystoreType().getType()))
.applySslConfig();
}
}
return mqtt5ClientBuilder.buildBlocking();
}
}

View File

@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.adapters;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.security.util.TlsConfiguration;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.Properties;
public class PahoMqttClientAdapter implements MqttClient {
public static final int DISCONNECT_TIMEOUT = 5000;
private final IMqttClient client;
private final MqttClientProperties clientProperties;
private final ComponentLog logger;
public PahoMqttClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) {
this.client = createClient(clientProperties, logger);
this.clientProperties = clientProperties;
this.logger = logger;
}
@Override
public boolean isConnected() {
return client.isConnected();
}
@Override
public void connect() {
logger.debug("Connecting to broker");
try {
final MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setCleanSession(clientProperties.isCleanSession());
connectOptions.setKeepAliveInterval(clientProperties.getKeepAliveInterval());
connectOptions.setMqttVersion(clientProperties.getMqttVersion().getVersionCode());
connectOptions.setConnectionTimeout(clientProperties.getConnectionTimeout());
final TlsConfiguration tlsConfiguration = clientProperties.getTlsConfiguration();
if (tlsConfiguration != null) {
connectOptions.setSSLProperties(transformSSLContextService(tlsConfiguration));
}
final String lastWillTopic = clientProperties.getLastWillTopic();
if (lastWillTopic != null) {
boolean lastWillRetain = clientProperties.getLastWillRetain() != null && clientProperties.getLastWillRetain();
connectOptions.setWill(lastWillTopic, clientProperties.getLastWillMessage().getBytes(), clientProperties.getLastWillQos(), lastWillRetain);
}
final String username = clientProperties.getUsername();
if (username != null) {
connectOptions.setUserName(username);
connectOptions.setPassword(clientProperties.getPassword().toCharArray());
}
client.connect(connectOptions);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during connecting to broker", e);
}
}
@Override
public void disconnect() {
logger.debug("Disconnecting client with timeout: {}", DISCONNECT_TIMEOUT);
try {
client.disconnect(DISCONNECT_TIMEOUT);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during disconnecting client with timeout: " + DISCONNECT_TIMEOUT, e);
}
}
@Override
public void close() {
logger.debug("Closing client");
try {
client.close();
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during closing client", e);
}
}
@Override
public void publish(String topic, StandardMqttMessage message) {
logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
try {
client.publish(topic, message.getPayload(), message.getQos(), message.isRetained());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during publishing message to " + topic + " with QoS: " + message.getQos(), e);
}
}
@Override
public void subscribe(String topicFilter, int qos) {
logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
try {
client.subscribe(topicFilter, qos);
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during subscribing to " + topicFilter + " with QoS: " + qos, e);
}
}
@Override
public void setCallback(MqttCallback callback) {
client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
callback.connectionLost(cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
logger.debug("Message arrived with id: {}", message.getId());
final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic);
callback.messageArrived(receivedMessage);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
callback.deliveryComplete(token.toString());
}
});
}
public static Properties transformSSLContextService(TlsConfiguration tlsConfiguration) {
final Properties properties = new Properties();
if (tlsConfiguration.getProtocol() != null) {
properties.setProperty("com.ibm.ssl.protocol", tlsConfiguration.getProtocol());
}
if (tlsConfiguration.getKeystorePath() != null) {
properties.setProperty("com.ibm.ssl.keyStore", tlsConfiguration.getKeystorePath());
}
if (tlsConfiguration.getKeystorePassword() != null) {
properties.setProperty("com.ibm.ssl.keyStorePassword", tlsConfiguration.getKeystorePassword());
}
if (tlsConfiguration.getKeystoreType() != null) {
properties.setProperty("com.ibm.ssl.keyStoreType", tlsConfiguration.getKeystoreType().getType());
}
if (tlsConfiguration.getTruststorePath() != null) {
properties.setProperty("com.ibm.ssl.trustStore", tlsConfiguration.getTruststorePath());
}
if (tlsConfiguration.getTruststorePassword() != null) {
properties.setProperty("com.ibm.ssl.trustStorePassword", tlsConfiguration.getTruststorePassword());
}
if (tlsConfiguration.getTruststoreType() != null) {
properties.setProperty("com.ibm.ssl.trustStoreType", tlsConfiguration.getTruststoreType().getType());
}
return properties;
}
private static org.eclipse.paho.client.mqttv3.MqttClient createClient(MqttClientProperties clientProperties, ComponentLog logger) {
logger.debug("Creating Mqtt v3 client");
try {
return new org.eclipse.paho.client.mqttv3.MqttClient(clientProperties.getBroker(), clientProperties.getClientId(), new MemoryPersistence());
} catch (org.eclipse.paho.client.mqttv3.MqttException e) {
throw new MqttException("An error has occurred during creating adapter for MQTT v3 client", e);
}
}
}

View File

@ -31,25 +31,25 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
@ -57,62 +57,67 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor { public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
public static int DISCONNECT_TIMEOUT = 5000; private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
protected ComponentLog logger; protected ComponentLog logger;
protected IMqttClient mqttClient;
protected volatile String broker; protected MqttClientProperties clientProperties;
protected volatile String brokerUri;
protected volatile String clientID; protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
protected MqttConnectOptions connOpts; protected MqttClient mqttClient;
protected MemoryPersistence persistence = new MemoryPersistence();
public ProcessSessionFactory processSessionFactory; public ProcessSessionFactory processSessionFactory;
public static final Validator QOS_VALIDATOR = new Validator() { public static final Validator QOS_VALIDATOR = (subject, input, context) -> {
Integer inputInt = Integer.parseInt(input);
if (inputInt < 0 || inputInt > 2) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2.").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
};
@Override public static final Validator BROKER_VALIDATOR = (subject, input, context) -> {
public ValidationResult validate(String subject, String input, ValidationContext context) { try {
Integer inputInt = Integer.parseInt(input); URI brokerURI = new URI(input);
if (inputInt < 0 || inputInt > 2) { if (!EMPTY.equals(brokerURI.getPath())) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build(); return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is: " + brokerURI.getPath()).build();
} }
if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, brokerURI.getScheme())) {
return new ValidationResult.Builder().subject(subject).valid(false)
.explanation("scheme is invalid. Supported schemes are: " + getSupportedSchemeList()).build();
}
} catch (URISyntaxException e) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
};
private static String getSupportedSchemeList() {
return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
}
public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> {
if ("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)) {
return new ValidationResult.Builder().subject(subject).valid(true).build(); return new ValidationResult.Builder().subject(subject).valid(true).build();
} else {
return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
.validate(subject, input, context);
} }
}; };
public static final Validator BROKER_VALIDATOR = new Validator() { public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
.name("MQTT Specification Version")
@Override .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
public ValidationResult validate(String subject, String input, ValidationContext context) { .allowableValues(
try{ ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
URI brokerURI = new URI(input); ALLOWABLE_VALUE_MQTT_VERSION_500,
if (!"".equals(brokerURI.getPath())) { ALLOWABLE_VALUE_MQTT_VERSION_311,
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build(); ALLOWABLE_VALUE_MQTT_VERSION_310
} )
if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || "wss".equals(brokerURI.getScheme()))) { .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build(); .required(true)
} .build();
} catch (URISyntaxException e) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
}
};
public static final Validator RETAIN_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){
return new ValidationResult.Builder().subject(subject).valid(true).build();
} else{
return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
.validate(subject, input, context);
}
}
};
public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder()
.name("Broker URI") .name("Broker URI")
@ -123,7 +128,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(BROKER_VALIDATOR) .addValidator(BROKER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder()
.name("Client ID") .name("Client ID")
.description("MQTT client ID to use. If not set, a UUID will be generated.") .description("MQTT client ID to use. If not set, a UUID will be generated.")
@ -155,7 +159,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder()
.name("Last Will Topic") .name("Last Will Topic")
.description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.") .description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.")
@ -174,7 +177,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.name("Last Will Retain") .name("Last Will Retain")
.description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.") .description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false) .required(false)
.allowableValues("true","false") .allowableValues("true", "false")
.build(); .build();
public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder()
@ -190,7 +193,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder()
.name("Session state") .name("Session state")
.description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.") .description("Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.")
.required(true) .required(true)
.allowableValues( .allowableValues(
ALLOWABLE_VALUE_CLEAN_SESSION_TRUE, ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
@ -199,16 +202,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue()) .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
.build(); .build();
public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
.name("MQTT Specification Version") .name("Session Expiry Interval")
.description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.") .description("After this interval the broker will expire the client and clear the session state.")
.allowableValues( .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
ALLOWABLE_VALUE_MQTT_VERSION_AUTO, .dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
ALLOWABLE_VALUE_MQTT_VERSION_311, .dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
ALLOWABLE_VALUE_MQTT_VERSION_310 .defaultValue(DEFAULT_SESSION_EXPIRY_INTERVAL)
)
.defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
.required(true)
.build(); .build();
public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder()
@ -232,8 +232,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){ public static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_BROKER_URI); descriptors.add(PROP_BROKER_URI);
descriptors.add(PROP_CLIENTID); descriptors.add(PROP_CLIENTID);
descriptors.add(PROP_USERNAME); descriptors.add(PROP_USERNAME);
@ -244,6 +244,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
descriptors.add(PROP_LAST_WILL_RETAIN); descriptors.add(PROP_LAST_WILL_RETAIN);
descriptors.add(PROP_LAST_WILL_QOS); descriptors.add(PROP_LAST_WILL_QOS);
descriptors.add(PROP_CLEAN_SESSION); descriptors.add(PROP_CLEAN_SESSION);
descriptors.add(PROP_SESSION_EXPIRY_INTERVAL);
descriptors.add(PROP_MQTT_VERSION); descriptors.add(PROP_MQTT_VERSION);
descriptors.add(PROP_CONN_TIMEOUT); descriptors.add(PROP_CONN_TIMEOUT);
descriptors.add(PROP_KEEP_ALIVE_INTERVAL); descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
@ -257,7 +258,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet(); final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) { if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build()); results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set.").build());
} }
final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet(); final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
@ -269,7 +270,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
// If any of the Last Will Properties are set // If any of the Last Will Properties are set
if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) { if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) {
// And any are not set // And any are not set
if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){ if (!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)) {
// Then mark as invalid // Then mark as invalid
results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " + results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " +
"set, all must be set.").build()); "set, all must be set.").build());
@ -289,88 +290,34 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
return results; return results;
} }
public static Properties transformSSLContextService(SSLContextService sslContextService){ protected void onScheduled(final ProcessContext context) {
Properties properties = new Properties(); clientProperties = getMqttClientProperties(context);
if (sslContextService.getSslAlgorithm() != null) {
properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
}
if (sslContextService.getKeyStoreFile() != null) {
properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
}
if (sslContextService.getKeyStorePassword() != null) {
properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
}
if (sslContextService.getKeyStoreType() != null) {
properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
}
if (sslContextService.getTrustStoreFile() != null) {
properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
}
if (sslContextService.getTrustStorePassword() != null) {
properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
}
if (sslContextService.getTrustStoreType() != null) {
properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
}
return properties;
} }
protected void onScheduled(final ProcessContext context){ protected void stopClient() {
broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue(); // Since client is created in the onTrigger method it can happen that it never will be created because of an initialization error.
brokerUri = broker.endsWith("/") ? broker : broker + "/"; // We are preventing additional nullPtrException here, but the clean solution would be to create the client in the onScheduled method.
clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); if (mqttClient != null) {
try {
logger.info("Disconnecting client");
mqttClient.disconnect();
} catch (Exception e) {
logger.error("Error disconnecting MQTT client", e);
}
if (clientID == null) { try {
clientID = UUID.randomUUID().toString(); logger.info("Closing client");
} mqttClient.close();
} catch (Exception e) {
logger.error("Error closing MQTT client", e);
}
connOpts = new MqttConnectOptions();
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
connOpts.setSSLProperties(sslProps);
}
PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
if (lastWillTopicProp.isSet()){
String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
}
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
}
protected void onStopped() {
try {
logger.info("Disconnecting client");
mqttClient.disconnect(DISCONNECT_TIMEOUT);
} catch(MqttException me) {
logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
}
try {
logger.info("Closing client");
mqttClient.close();
mqttClient = null; mqttClient = null;
} catch (MqttException me) {
logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
} }
} }
protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { protected MqttClient createMqttClient() throws TlsException {
return new MqttClient(broker, clientID, persistence); return mqttClientFactory.create(clientProperties, getLogger());
} }
@ -384,7 +331,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
onTrigger(context, session); onTrigger(context, session);
session.commitAsync(); session.commitAsync();
} catch (final Throwable t) { } catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); getLogger().error("{} failed to process due to {}; rolling back session", this, t);
session.rollback(true); session.rollback(true);
throw t; throw t;
} }
@ -392,8 +339,52 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected boolean isConnected(){ protected boolean isConnected() {
return (mqttClient != null && mqttClient.isConnected()); return (mqttClient != null && mqttClient.isConnected());
} }
protected MqttClientProperties getMqttClientProperties(final ProcessContext context) {
final MqttClientProperties clientProperties = new MqttClientProperties();
try {
clientProperties.setBrokerUri(new URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid Broker URI", e);
}
String clientId = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
if (clientId == null) {
clientId = UUID.randomUUID().toString();
}
clientProperties.setClientId(clientId);
clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
}
clientProperties.setLastWillTopic(context.getProperty(PROP_LAST_WILL_TOPIC).getValue());
clientProperties.setLastWillMessage(context.getProperty(PROP_LAST_WILL_MESSAGE).getValue());
final PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
clientProperties.setLastWillRetain(lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
clientProperties.setLastWillQos(context.getProperty(PROP_LAST_WILL_QOS).asInteger());
final PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if (usernameProp.isSet()) {
clientProperties.setUsername(usernameProp.evaluateAttributeExpressions().getValue());
}
clientProperties.setPassword(context.getProperty(PROP_PASSWORD).getValue());
return clientProperties;
}
} }

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
public interface MqttCallback {
void connectionLost(Throwable cause);
void messageArrived(ReceivedMqttMessage message);
void deliveryComplete(String token);
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
public interface MqttClient {
/**
* Determines if this client is currently connected to an MQTT broker.
*
* @return whether the client is connected.
*/
boolean isConnected();
/**
* Connects the client to an MQTT broker.
*/
void connect();
/**
* Disconnects client from an MQTT broker.
*/
void disconnect();
/**
* Releases all resource associated with the client. After the client has
* been closed it cannot be reused. For instance attempts to connect will fail.
*/
void close();
/**
* Publishes a message to a topic on the MQTT broker.
*
* @param topic to deliver the message to, for example "pipe-1/flow-rate"
* @param message to deliver to the MQTT broker
*/
void publish(String topic, StandardMqttMessage message);
/**
* Subscribe to a topic.
*
* @param topicFilter the topic to subscribe to, which can include wildcards.
* @param qos the maximum quality of service at which to subscribe. Messages
* published at a lower quality of service will be received at the published
* QoS. Messages published at a higher quality of service will be received using
* the QoS specified on the subscribe.
*/
void subscribe(String topicFilter, int qos);
/**
* Sets a callback listener to use for events that happen asynchronously.
*
* @param callback for matching events
*/
void setCallback(MqttCallback callback);
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.mqtt.adapters.HiveMqV5ClientAdapter;
import org.apache.nifi.processors.mqtt.adapters.PahoMqttClientAdapter;
import org.apache.nifi.security.util.TlsException;
public class MqttClientFactory {
public MqttClient create(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
switch (clientProperties.getMqttVersion()) {
case MQTT_VERSION_3_AUTO:
case MQTT_VERSION_3_1:
case MQTT_VERSION_3_1_1:
return new PahoMqttClientAdapter(clientProperties, logger);
case MQTT_VERSION_5_0:
return new HiveMqV5ClientAdapter(clientProperties, logger);
default:
throw new MqttException("Unsupported Mqtt version: " + clientProperties.getMqttVersion());
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.security.util.TlsConfiguration;
import java.net.URI;
public class MqttClientProperties {
private URI brokerUri;
private String clientId;
private MqttVersion mqttVersion;
private int keepAliveInterval;
private int connectionTimeout;
private boolean cleanSession;
private Long sessionExpiryInterval;
private TlsConfiguration tlsConfiguration;
private String lastWillTopic;
private String lastWillMessage;
private Boolean lastWillRetain;
private Integer lastWillQos;
private String username;
private String password;
public String getBroker() {
return brokerUri.toString();
}
public MqttProtocolScheme getScheme() {
return MqttProtocolScheme.valueOf(brokerUri.getScheme().toUpperCase());
}
public URI getBrokerUri() {
return brokerUri;
}
public void setBrokerUri(URI brokerUri) {
this.brokerUri = brokerUri;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MqttVersion getMqttVersion() {
return mqttVersion;
}
public void setMqttVersion(MqttVersion mqttVersion) {
this.mqttVersion = mqttVersion;
}
public int getKeepAliveInterval() {
return keepAliveInterval;
}
public void setKeepAliveInterval(int keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
}
public int getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public boolean isCleanSession() {
return cleanSession;
}
public void setCleanSession(boolean cleanSession) {
this.cleanSession = cleanSession;
}
public Long getSessionExpiryInterval() {
return sessionExpiryInterval;
}
public void setSessionExpiryInterval(Long sessionExpiryInterval) {
this.sessionExpiryInterval = sessionExpiryInterval;
}
public TlsConfiguration getTlsConfiguration() {
return tlsConfiguration;
}
public void setTlsConfiguration(TlsConfiguration tlsConfiguration) {
this.tlsConfiguration = tlsConfiguration;
}
public String getLastWillTopic() {
return lastWillTopic;
}
public void setLastWillTopic(String lastWillTopic) {
this.lastWillTopic = lastWillTopic;
}
public String getLastWillMessage() {
return lastWillMessage;
}
public void setLastWillMessage(String lastWillMessage) {
this.lastWillMessage = lastWillMessage;
}
public Boolean getLastWillRetain() {
return lastWillRetain;
}
public void setLastWillRetain(Boolean lastWillRetain) {
this.lastWillRetain = lastWillRetain;
}
public Integer getLastWillQos() {
return lastWillQos;
}
public void setLastWillQos(Integer lastWillQos) {
this.lastWillQos = lastWillQos;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}

View File

@ -18,7 +18,11 @@
package org.apache.nifi.processors.mqtt.common; package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1;
import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1_1;
import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_AUTO;
import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_5_0;
public class MqttConstants { public class MqttConstants {
@ -66,15 +70,16 @@ public class MqttConstants {
------------------------------------------ ------------------------------------------
*/ */
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO = public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT), new AllowableValue(String.valueOf(MQTT_VERSION_3_AUTO.getVersionCode()), MQTT_VERSION_3_AUTO.getDisplayName(),
"AUTO",
"Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker"); "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
new AllowableValue(String.valueOf(MQTT_VERSION_5_0.getVersionCode()), MQTT_VERSION_5_0.getDisplayName());
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 = public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1), new AllowableValue(String.valueOf(MQTT_VERSION_3_1_1.getVersionCode()), MQTT_VERSION_3_1_1.getDisplayName());
"v3.1.1");
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 = public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1), new AllowableValue(String.valueOf(MQTT_VERSION_3_1.getVersionCode()), MQTT_VERSION_3_1.getDisplayName());
"v3.1.0");
} }

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
public class MqttException extends RuntimeException {
public MqttException(String message) {
super(message);
}
public MqttException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
public enum MqttProtocolScheme {
TCP,
SSL,
WS,
WSS
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
public enum MqttVersion {
MQTT_VERSION_3_AUTO(MqttConnectOptions.MQTT_VERSION_DEFAULT, "v3 AUTO"),
MQTT_VERSION_3_1(MqttConnectOptions.MQTT_VERSION_3_1, "v3.1.0"),
MQTT_VERSION_3_1_1(MqttConnectOptions.MQTT_VERSION_3_1_1, "v3.1.1"),
MQTT_VERSION_5_0(5, "v5.0");
private final int versionCode;
private final String displayName;
MqttVersion(int versionCode, String displayName) {
this.versionCode = versionCode;
this.displayName = displayName;
}
public int getVersionCode() {
return versionCode;
}
public String getDisplayName() {
return displayName;
}
public static MqttVersion fromVersionCode(int versionCode) {
for (MqttVersion version : values()) {
if (version.getVersionCode() == versionCode) {
return version;
}
}
throw new IllegalArgumentException("Unable to map MqttVersionCode from version code: " + versionCode);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
/**
* Represents a received MQTT message
*/
public class ReceivedMqttMessage extends StandardMqttMessage {
private String topic;
public ReceivedMqttMessage(byte[] payload, int qos, boolean retained, String topic) {
super(payload, qos, retained);
this.topic = topic;
}
public String getTopic() {
return topic;
}
public boolean isDuplicate() {
return false;
}
}

View File

@ -14,29 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.mqtt.common; package org.apache.nifi.processors.mqtt.common;
import org.eclipse.paho.client.mqttv3.MqttMessage; /**
* Represents a MQTT message.
public class MQTTQueueMessage { */
private String topic; public class StandardMqttMessage {
private byte[] payload; private byte[] payload;
private int qos = 1; private int qos;
private boolean retained = false; private boolean retained;
private boolean duplicate = false;
public MQTTQueueMessage(String topic, MqttMessage message) { public StandardMqttMessage(byte[] payload, int qos, boolean retained) {
this.topic = topic; this.payload = payload;
payload = message.getPayload(); this.qos = qos;
qos = message.getQos(); this.retained = retained;
retained = message.isRetained();
duplicate = message.isDuplicate();
}
public String getTopic() {
return topic;
} }
public byte[] getPayload() { public byte[] getPayload() {
@ -50,8 +41,4 @@ public class MQTTQueueMessage {
public boolean isRetained() { public boolean isRetained() {
return retained; return retained;
} }
public boolean isDuplicate() {
return duplicate;
}
} }

View File

@ -18,8 +18,10 @@
package org.apache.nifi.processors.mqtt; package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttTestClient; import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.security.util.SslContextFactory;
@ -29,10 +31,6 @@ import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -59,8 +57,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
} }
@Override @Override
public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { protected MqttClient createMqttClient() {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber); mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
return mqttTestClient; return mqttTestClient;
} }
} }
@ -111,10 +109,10 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException { public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException {
testRunner.run(1, false); testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor(); ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
MQTTQueueMessage mock = mock(MQTTQueueMessage.class); ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]); when(mock.getPayload()).thenReturn(new byte[0]);
when(mock.getTopic()).thenReturn("testTopic"); when(mock.getTopic()).thenReturn("testTopic");
BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor); BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock); mqttQueue.add(mock);
ProcessSession session = testRunner.getProcessSessionFactory().createSession(); ProcessSession session = testRunner.getProcessSessionFactory().createSession();
@ -131,11 +129,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
} }
@Override @Override
public void internalPublish(final MqttMessage message, final String topicName) { public void internalPublish(final StandardMqttMessage message, final String topicName) {
try { mqttTestClient.publish(topicName, message);
mqttTestClient.publish(topicName, message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -17,13 +17,12 @@
package org.apache.nifi.processors.mqtt; package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient; import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import java.util.Arrays; import java.util.Arrays;
@ -34,11 +33,12 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
@Override @Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage; StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload())); String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
assertEquals(qos, mqttQueueMessage.getQos()); assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
assertEquals(retain, mqttQueueMessage.isRetained()); assertEquals(qos, lastPublishedMessage.getQos());
assertEquals(topic, mqttQueueMessage.getTopic()); assertEquals(retain, lastPublishedMessage.isRetained());
assertEquals(topic, lastPublishedTopic);
} }
private MqttTestClient mqttTestClient; private MqttTestClient mqttTestClient;
@ -50,8 +50,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
} }
@Override @Override
public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { protected MqttClient createMqttClient() throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher); mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
return mqttTestClient; return mqttTestClient;
} }
} }

View File

@ -17,24 +17,9 @@
package org.apache.nifi.processors.mqtt.common; package org.apache.nifi.processors.mqtt.common;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class MqttTestClient implements IMqttClient { public class MqttTestClient implements MqttClient {
public String serverURI;
public String clientId;
public AtomicBoolean connected = new AtomicBoolean(false); public AtomicBoolean connected = new AtomicBoolean(false);
@ -42,233 +27,66 @@ public class MqttTestClient implements IMqttClient {
public ConnectType type; public ConnectType type;
public enum ConnectType {Publisher, Subscriber} public enum ConnectType {Publisher, Subscriber}
public MQTTQueueMessage publishedMessage; private StandardMqttMessage lastPublishedMessage;
private String lastPublishedTopic;
public String subscribedTopic; public String subscribedTopic;
public int subscribedQos; public int subscribedQos;
public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException { public MqttTestClient(ConnectType type) {
this.serverURI = serverURI;
this.clientId = clientId;
this.type = type; this.type = type;
} }
@Override
public void connect() throws MqttSecurityException, MqttException {
connected.set(true);
}
@Override
public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
connected.set(true);
}
@Override
public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException {
return null;
}
@Override
public void disconnect() throws MqttException {
connected.set(false);
}
@Override
public void disconnect(long quiesceTimeout) throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly() throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly(long disconnectTimeout) throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
connected.set(false);
}
@Override
public void subscribe(String topicFilter) throws MqttException, MqttSecurityException {
subscribedTopic = topicFilter;
subscribedQos = -1;
}
@Override
public void subscribe(String[] topicFilters) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void subscribe(String topicFilter, int qos) throws MqttException {
subscribedTopic = topicFilter;
subscribedQos = qos;
}
@Override
public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void subscribe(String s, IMqttMessageListener iMqttMessageListener) throws MqttException, MqttSecurityException {
}
@Override
public void subscribe(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
}
@Override
public void subscribe(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
}
@Override
public void subscribe(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
}
@Override
public IMqttToken subscribeWithResponse(String s) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String s, IMqttMessageListener iMqttMessageListener) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String s, int i) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String[] strings) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String[] strings, int[] ints) throws MqttException {
return null;
}
@Override
public IMqttToken subscribeWithResponse(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
return null;
}
@Override
public void unsubscribe(String topicFilter) throws MqttException {
subscribedTopic = "";
subscribedQos = -2;
}
@Override
public void unsubscribe(String[] topicFilters) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
switch (type) {
case Publisher:
publishedMessage = new MQTTQueueMessage(topic, message);
break;
case Subscriber:
try {
mqttCallback.messageArrived(topic, message);
} catch (Exception e) {
throw new MqttException(e);
}
break;
}
}
@Override
public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException {
switch (type) {
case Publisher:
publishedMessage = new MQTTQueueMessage(topic, message);
break;
case Subscriber:
try {
mqttCallback.messageArrived(topic, message);
} catch (Exception e) {
throw new MqttException(e);
}
break;
}
}
@Override
public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
}
@Override
public MqttTopic getTopic(String topic) {
return null;
}
@Override @Override
public boolean isConnected() { public boolean isConnected() {
return connected.get(); return connected.get();
} }
@Override @Override
public String getClientId() { public void connect() {
return clientId; connected.set(true);
} }
@Override @Override
public String getServerURI() { public void disconnect() {
return serverURI; connected.set(false);
} }
@Override @Override
public IMqttDeliveryToken[] getPendingDeliveryTokens() { public void close() {
return new IMqttDeliveryToken[0];
}
@Override
public void setManualAcks(boolean b) {
} }
@Override @Override
public void reconnect() throws MqttException { public void publish(String topic, StandardMqttMessage message) {
switch (type) {
case Publisher:
lastPublishedMessage = message;
lastPublishedTopic = topic;
break;
case Subscriber:
mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic));
break;
}
} }
@Override @Override
public void messageArrivedComplete(int i, int i1) throws MqttException { public void subscribe(String topicFilter, int qos) {
subscribedTopic = topicFilter;
subscribedQos = qos;
} }
@Override @Override
public void close() throws MqttException { public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
}
public StandardMqttMessage getLastPublishedMessage() {
return lastPublishedMessage;
}
public String getLastPublishedTopic() {
return lastPublishedTopic;
} }
} }

View File

@ -28,8 +28,6 @@ import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.lang.reflect.Field; import java.lang.reflect.Field;
@ -66,7 +64,7 @@ public abstract class TestConsumeMqttCommon {
private static final int LEAST_ONE = 1; private static final int LEAST_ONE = 1;
private static final int EXACTLY_ONCE = 2; private static final int EXACTLY_ONCE = 2;
public abstract void internalPublish(MqttMessage message, String topicName); public abstract void internalPublish(StandardMqttMessage message, String topicName);
@Test @Test
public void testClientIDConfiguration() { public void testClientIDConfiguration() {
@ -295,10 +293,8 @@ public abstract class TestConsumeMqttCommon {
testRunner.assertValid(); testRunner.assertValid();
MqttMessage innerMessage = new MqttMessage(); final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array()); ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic");
innerMessage.setQos(2);
MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext()); consumeMQTT.onScheduled(testRunner.getProcessContext());
@ -313,7 +309,7 @@ public abstract class TestConsumeMqttCommon {
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue"); Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true); f.setAccessible(true);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT); LinkedBlockingQueue<ReceivedMqttMessage> queue = (LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
queue.add(testMessage); queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext()); consumeMQTT.onUnscheduled(testRunner.getProcessContext());
@ -551,7 +547,7 @@ public abstract class TestConsumeMqttCommon {
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true); f.setAccessible(true);
IMqttClient mqttClient = (IMqttClient) f.get(processor); MqttClient mqttClient = (MqttClient) f.get(processor);
return mqttClient.isConnected(); return mqttClient.isConnected();
} }
@ -563,10 +559,10 @@ public abstract class TestConsumeMqttCommon {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException { public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue"); Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true); mqttQueueField.setAccessible(true);
return (BlockingQueue<MQTTQueueMessage>) mqttQueueField.get(consumeMQTT); return (BlockingQueue<ReceivedMqttMessage>) mqttQueueField.get(consumeMQTT);
} }
public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
@ -585,11 +581,7 @@ public abstract class TestConsumeMqttCommon {
} }
private void publishMessage(final String payload, final int qos) { private void publishMessage(final String payload, final int qos) {
final MqttMessage message = new MqttMessage(); final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
message.setQos(qos);
message.setRetained(false);
internalPublish(message, "testTopic"); internalPublish(message, "testTopic");
} }
} }