NIFI-1808 Refactored MQTT processors, and added proper unit and integration tests

This closes 
This commit is contained in:
jpercivall 2016-05-24 10:05:07 -04:00 committed by Oleg Zhurakousky
parent 7a4c71fec7
commit 7923fd04c3
26 changed files with 2667 additions and 485 deletions

View File

@ -27,6 +27,11 @@
<source.skip>true</source.skip> <source.skip>true</source.skip>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mqtt-processors</artifactId> <artifactId>nifi-mqtt-processors</artifactId>

View File

@ -73,14 +73,38 @@
<version>1.1</version> <version>1.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.glassfish.tyrus.bundles</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>tyrus-standalone-client-jdk</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.12</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.glassfish.tyrus</groupId> <groupId>io.moquette</groupId>
<artifactId>tyrus-container-grizzly-client</artifactId> <artifactId>moquette-broker</artifactId>
<version>1.12</version> <version>0.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/integration/TestConsumeMQTT.java</exclude>
<exclude>**/integration/TestConsumeMqttSSL.java</exclude>
<exclude>**/integration/TestPublishAndSubscribeMqttIntegration.java</exclude>
<exclude>**/integration/TestPublishMQTT.java</exclude>
<exclude>**/integration/TestPublishMqttSSL.java</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -0,0 +1,342 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.io.OutputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@SeeAlso({PublishMQTT.class})
@WritesAttributes({
@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"),
@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"),
@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."),
@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."),
@WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " +
"on the topic.")})
public class ConsumeMQTT extends AbstractMQTTProcessor {
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder()
.name("Topic Filter")
.description("The MQTT topic filter to designate the topics to subscribe to.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
.name("Quality of Service(QoS)")
.description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
.required(true)
.defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
.allowableValues(
ALLOWABLE_VALUE_QOS_0,
ALLOWABLE_VALUE_QOS_1,
ALLOWABLE_VALUE_QOS_2)
.build();
public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Queue Size")
.description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this " +
"processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
private static int DISCONNECT_TIMEOUT = 5000;
private volatile long maxQueueSize;
private volatile int qos;
private volatile String topicFilter;
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
public static final Relationship REL_MESSAGE = new Relationship.Builder()
.name("Message")
.description("The MQTT message output")
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static{
final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
innerDescriptorsList.add(PROP_TOPIC_FILTER);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
innerRelationshipsSet.add(REL_MESSAGE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
// resize the receive buffer, but preserve data
if (descriptor == PROP_MAX_QUEUE_SIZE) {
// it's a mandatory integer, never null
int newSize = Integer.valueOf(newValue);
if (mqttQueue != null) {
int msgPending = mqttQueue.size();
if (msgPending > newSize) {
logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.",
new Object[]{newSize, msgPending});
return;
}
LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
mqttQueue.drainTo(newBuffer);
mqttQueue = newBuffer;
}
}
}
@Override
public Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = super.customValidate(context);
int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger();
if (mqttQueue == null) {
mqttQueue = new LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger());
}
int msgPending = mqttQueue.size();
if (msgPending > newSize) {
results.add(new ValidationResult.Builder()
.valid(false)
.subject("ConsumeMQTT Configuration")
.explanation(String.format("%s (%d) is smaller than the number of messages pending (%d).",
PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, msgPending))
.build());
}
return results;
}
@Override
protected void init(final ProcessorInitializationContext context) {
logger = getLogger();
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException {
qos = context.getProperty(PROP_QOS).asInteger();
maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
buildClient(context);
scheduled.set(true);
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) {
scheduled.set(false);
mqttClientConnectLock.writeLock().lock();
try {
if(isConnected()) {
mqttClient.disconnect(DISCONNECT_TIMEOUT);
logger.info("Disconnected the MQTT client.");
}
} catch(MqttException me) {
logger.error("Failed when disconnecting the MQTT client.", me);
} finally {
mqttClientConnectLock.writeLock().unlock();
}
}
@OnStopped
public void onStopped(final ProcessContext context) throws IOException {
if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
logger.info("Finishing processing leftover messages");
ProcessSession session = processSessionFactory.createSession();
transferQueue(session);
} else {
if (mqttQueue!= null && !mqttQueue.isEmpty()){
throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " +
"clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " +
"in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
}
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){
logger.info("Queue is empty and client is not connected. Attempting to reconnect.");
try {
reconnect();
} catch (MqttException e) {
logger.error("Connection to " + broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e);
context.yield();
}
}
if (mqttQueue.isEmpty()) {
return;
}
transferQueue(session);
}
private void transferQueue(ProcessSession session){
while (!mqttQueue.isEmpty()) {
FlowFile messageFlowfile = session.create();
final MQTTQueueMessage mqttMessage = mqttQueue.peek();
Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, broker);
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(mqttMessage.getPayload());
}
});
String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString();
session.getProvenanceReporter().receive(messageFlowfile, transitUri);
session.transfer(messageFlowfile, REL_MESSAGE);
mqttQueue.remove(mqttMessage);
session.commit();
}
}
private class ConsumeMQTTCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
logger.warn("Connection to " + broker + " lost", cause);
try {
reconnect();
} catch (MqttException e) {
logger.error("Connection to " + broker + " lost and callback re-connect failed.");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (logger.isInfoEnabled()) {
logger.info("MQTT message arrived on topic:" + topic);
}
if (mqttQueue.size() >= maxQueueSize){
throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
} else {
mqttQueue.add(new MQTTQueueMessage(topic, message));
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token);
}
}
private void reconnect() throws MqttException {
mqttClientConnectLock.writeLock().lock();
try {
if (!mqttClient.isConnected()) {
setAndConnectClient(new ConsumeMQTTCallback());
mqttClient.subscribe(topicFilter, qos);
}
} finally {
mqttClientConnectLock.writeLock().unlock();
}
}
private boolean isConnected(){
return (mqttClient != null && mqttClient.isConnected());
}
}

View File

@ -1,191 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.io.OutputStream;
import java.io.IOException;
@Tags({"GetMQTT"})
@CapabilityDescription("Gets messages from an MQTT broker")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"),
@WritesAttribute(attribute="topic", description="MQTT topic on which message was received")})
public class GetMQTT extends AbstractProcessor implements MqttCallback {
String topic;
String broker;
String clientID;
double lastTime;
boolean firstTime = true;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient;
LinkedBlockingQueue<MQTTQueueMessage> mqttQueue = new LinkedBlockingQueue<>();
public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor
.Builder().name("Broker address")
.description("MQTT broker address (e.g. tcp://localhost:1883)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new PropertyDescriptor
.Builder().name("MQTT topic")
.description("MQTT topic to subscribe to")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor
.Builder().name("MQTT client ID")
.description("MQTT client ID to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship RELATIONSHIP_MQTTMESSAGE = new Relationship.Builder()
.name("MQTTMessage")
.description("MQTT message output")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
public void connectionLost(Throwable t) {
getLogger().info("Connection to " + broker + " lost");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload()));
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(PROPERTY_BROKER_ADDRESS);
descriptors.add(PROPERTY_MQTT_TOPIC);
descriptors.add(PROPERTY_MQTT_CLIENTID);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(RELATIONSHIP_MQTTMESSAGE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue();
clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
mqttClient = new MqttClient(broker, clientID, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
mqttClient.setCallback(this);
connOpts.setCleanSession(true);
getLogger().info("Connecting to broker: " + broker);
mqttClient.connect(connOpts);
mqttClient.subscribe(topic, 0);
} catch(MqttException me) {
getLogger().error("msg "+me.getMessage());
}
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) {
try {
mqttClient.disconnect();
} catch(MqttException me) {
}
getLogger().error("Disconnected");
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List messageList = new LinkedList();
mqttQueue.drainTo(messageList);
if (messageList.isEmpty())
return;
Iterator iterator = messageList.iterator();
while (iterator.hasNext()) {
FlowFile messageFlowfile = session.create();
final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next();
messageFlowfile = session.putAttribute(messageFlowfile, "broker", broker);
messageFlowfile = session.putAttribute(messageFlowfile, "topic", topic);
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(m.message);
}
});
session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE);
session.commit();
}
}
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.io.InputStream;
import java.io.IOException;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"publish", "MQTT", "IOT"})
@CapabilityDescription("Publishes a message to an MQTT topic")
@SeeAlso({ConsumeMQTT.class})
public class PublishMQTT extends AbstractMQTTProcessor {
public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder()
.name("Topic")
.description("The topic to publish the message to.")
.expressionLanguageSupported(true)
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
.name("Quality of Service(QoS)")
.description("The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'. " +
"Expression language is allowed in order to support publishing messages with different QoS but the end value of the property must be either '0', '1' or '2'. ")
.required(true)
.expressionLanguageSupported(true)
.addValidator(QOS_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_RETAIN = new PropertyDescriptor.Builder()
.name("Retain Message")
.description("Whether or not the retain flag should be set on the MQTT message.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(RETAIN_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are transferred to this relationship.")
.build();
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
static {
final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
innerDescriptorsList.add(PROP_TOPIC);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_RETAIN);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<>();
innerRelationshipsSet.add(REL_SUCCESS);
innerRelationshipsSet.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
@Override
protected void init(final ProcessorInitializationContext context) {
logger = getLogger();
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
buildClient(context);
}
@OnStopped
public void onStop(final ProcessContext context) {
mqttClientConnectLock.writeLock().lock();
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
logger.info("Disconnected the MQTT client.");
}
} catch(MqttException me) {
logger.error("Failed when disconnecting the MQTT client.", me);
} finally {
mqttClientConnectLock.writeLock().unlock();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowfile = session.get();
if (flowfile == null) {
return;
}
if(mqttClient == null || !mqttClient.isConnected()){
logger.info("Was disconnected from client or was never connected, attempting to connect.");
try {
reconnect();
} catch (MqttException e) {
context.yield();
session.transfer(flowfile, REL_FAILURE);
logger.error("MQTT client is disconnected and re-connecting failed. Transferring FlowFile to fail and yielding", e);
return;
}
}
// get the MQTT topic
String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
if (topic == null || topic.isEmpty()) {
logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure");
session.transfer(flowfile, REL_FAILURE);
return;
}
// do the read
final byte[] messageContent = new byte[(int) flowfile.getSize()];
session.read(flowfile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
final MqttMessage mqttMessage = new MqttMessage(messageContent);
mqttMessage.setQos(qos);
mqttMessage.setPayload(messageContent);
mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
try {
mqttClientConnectLock.readLock().lock();
try {
/*
* Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
* MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
*/
mqttClient.publish(topic, mqttMessage);
} finally {
mqttClientConnectLock.readLock().unlock();
}
session.transfer(flowfile, REL_SUCCESS);
} catch(MqttException me) {
logger.error("Failed to publish message.", me);
session.transfer(flowfile, REL_FAILURE);
}
}
private class PublishMQTTCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
logger.warn("Connection to " + broker + " lost", cause);
try {
reconnect();
} catch (MqttException e) {
logger.error("Connection to " + broker + " lost and re-connect failed");
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
}
}
private void reconnect() throws MqttException {
mqttClientConnectLock.writeLock().lock();
try {
if (!mqttClient.isConnected()) {
setAndConnectClient(new PublishMQTTCallback());
getLogger().info("Connecting to broker: " + broker);
}
} finally {
mqttClientConnectLock.writeLock().unlock();
}
}
}

View File

@ -1,193 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.commons.io.IOUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.io.InputStream;
import java.io.IOException;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"PutMQTT"})
@CapabilityDescription("Publishes message to an MQTT topic")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to publish message to")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class PutMQTT extends AbstractProcessor implements MqttCallback {
String broker;
String clientID;
MemoryPersistence persistence = new MemoryPersistence();
MqttClient mqttClient;
public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor
.Builder().name("Broker address")
.description("MQTT broker address (e.g. tcp://localhost:1883)")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor
.Builder().name("MQTT client ID")
.description("MQTT client ID to use")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
public void connectionLost(Throwable t) {
getLogger().info("Connection to " + broker + " lost");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(PROPERTY_BROKER_ADDRESS);
descriptors.add(PROPERTY_MQTT_CLIENTID);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
try {
broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue();
clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue();
mqttClient = new MqttClient(broker, clientID, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
mqttClient.setCallback(this);
connOpts.setCleanSession(true);
getLogger().info("Connecting to broker: " + broker);
mqttClient.connect(connOpts);
} catch(MqttException me) {
getLogger().error("msg "+me.getMessage());
}
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) {
try {
mqttClient.disconnect();
} catch(MqttException me) {
}
getLogger().error("Disconnected");
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final AtomicReference<String> message = new AtomicReference<>();
FlowFile flowfile = session.get();
message.set("");
// get the MQTT topic
String topic = flowfile.getAttribute("topic");
if (topic == null) {
getLogger().error("No topic attribute on flowfile");
session.remove(flowfile);
return;
}
// do the read
session.read(flowfile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try{
message.set(IOUtils.toString(in));
}catch(Exception e){
getLogger().error("Failed to read flowfile " + e.getMessage());
}
}
});
try {
session.remove(flowfile);
} catch (Exception e) {
getLogger().error("Failed to remove flowfile " + e.getMessage());
return;
}
String output = message.get();
if ((output == null) || output.isEmpty()) {
return;
}
try {
mqttClient.publish(topic, output.getBytes(), 0, false);
} catch(MqttException me) {
getLogger().error("msg "+me.getMessage());
}
}
}

View File

@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2;
public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
protected ComponentLog logger;
protected IMqttClient mqttClient;
protected final ReadWriteLock mqttClientConnectLock = new ReentrantReadWriteLock(true);
protected volatile String broker;
protected volatile String clientID;
protected MqttConnectOptions connOpts;
protected MemoryPersistence persistence = new MemoryPersistence();
public ProcessSessionFactory processSessionFactory;
public static final Validator QOS_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
Integer inputInt = Integer.parseInt(input);
if (inputInt < 0 || inputInt > 2) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
}
};
public static final Validator BROKER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
try{
URI brokerURI = new URI(input);
if (!"".equals(brokerURI.getPath())) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
}
if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()))) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp' and 'ssl' schemes are supported.").build();
}
} catch (URISyntaxException e) {
return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
}
return new ValidationResult.Builder().subject(subject).valid(true).build();
}
};
public static final Validator RETAIN_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){
return new ValidationResult.Builder().subject(subject).valid(true).build();
} else{
return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
.validate(subject, input, context);
}
}
};
public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder()
.name("Broker URI")
.description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp' and 'ssl' schemes are supported. In order to use 'ssl', the SSL Context " +
"Service property must be set.")
.required(true)
.addValidator(BROKER_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder()
.name("Client ID")
.description("MQTT client ID to use")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_USERNAME = new PropertyDescriptor.Builder()
.name("Username")
.description("Username to use when connecting to the broker")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password to use when connecting to the broker")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder()
.name("Last Will Topic")
.description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new PropertyDescriptor.Builder()
.name("Last Will Message")
.description("The message to send as the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new PropertyDescriptor.Builder()
.name("Last Will Retain")
.description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
.allowableValues("true","false")
.build();
public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder()
.name("Last Will QoS Level")
.description("QoS level to be used when publishing the Last Will Message")
.required(false)
.allowableValues(
ALLOWABLE_VALUE_QOS_0,
ALLOWABLE_VALUE_QOS_1,
ALLOWABLE_VALUE_QOS_2
)
.build();
public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder()
.name("Session state")
.description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.")
.required(true)
.allowableValues(
ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
ALLOWABLE_VALUE_CLEAN_SESSION_FALSE
)
.defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
.build();
public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
.name("MQTT Specification Version")
.description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
.allowableValues(
ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
ALLOWABLE_VALUE_MQTT_VERSION_311,
ALLOWABLE_VALUE_MQTT_VERSION_310
)
.defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
.required(true)
.build();
public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection Timeout (seconds)")
.description("Maximum time interval the client will wait for the network connection to the MQTT server " +
"to be established. The default timeout is 30 seconds. " +
"A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.")
.required(false)
.defaultValue("30")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_KEEP_ALIVE_INTERVAL = new PropertyDescriptor.Builder()
.name("Keep Alive Interval (seconds)")
.description("Defines the maximum time interval between messages sent or received. It enables the " +
"client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. " +
"The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, " +
"the client sends a very small \"ping\" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.")
.required(false)
.defaultValue("60")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(PROP_BROKER_URI);
descriptors.add(PROP_CLIENTID);
descriptors.add(PROP_USERNAME);
descriptors.add(PROP_PASSWORD);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(PROP_LAST_WILL_TOPIC);
descriptors.add(PROP_LAST_WILL_MESSAGE);
descriptors.add(PROP_LAST_WILL_RETAIN);
descriptors.add(PROP_LAST_WILL_QOS);
descriptors.add(PROP_CLEAN_SESSION);
descriptors.add(PROP_MQTT_VERSION);
descriptors.add(PROP_CONN_TIMEOUT);
descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
return descriptors;
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final boolean usernameSet = validationContext.getProperty(PROP_USERNAME).isSet();
final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build());
}
final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
final boolean lastWillMessageSet = validationContext.getProperty(PROP_LAST_WILL_MESSAGE).isSet();
final boolean lastWillRetainSet = validationContext.getProperty(PROP_LAST_WILL_RETAIN).isSet();
final boolean lastWillQosSet = validationContext.getProperty(PROP_LAST_WILL_QOS).isSet();
// If any of the Last Will Properties are set
if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) {
// And any are not set
if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){
// Then mark as invalid
results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " +
"set, all must be set.").build());
}
}
try {
URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " +
"the broker URI, the SSL Context Service must be set.").build());
}
} catch (URISyntaxException e) {
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it is not valid URI syntax.").build());
}
return results;
}
public static Properties transformSSLContextService(SSLContextService sslContextService){
Properties properties = new Properties();
properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
return properties;
}
protected void buildClient(ProcessContext context){
try {
broker = context.getProperty(PROP_BROKER_URI).getValue();
clientID = context.getProperty(PROP_CLIENTID).getValue();
connOpts = new MqttConnectOptions();
connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
connOpts.setSSLProperties(sslProps);
}
PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
if (lastWillTopicProp.isSet()){
String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
}
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
connOpts.setUserName(usernameProp.getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
mqttClientConnectLock.writeLock().lock();
try{
mqttClient = getMqttClient(broker, clientID, persistence);
} finally {
mqttClientConnectLock.writeLock().unlock();
}
} catch(MqttException me) {
logger.error("Failed to initialize the connection to the " + me.getMessage());
}
}
protected IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
return new MqttClient(broker, clientID, persistence);
}
@Override
public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
if (processSessionFactory == null) {
processSessionFactory = sessionFactory;
}
ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
session.commit();
} catch (final Throwable t) {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
}
}
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
// Caller should obtain the necessary lock
protected void setAndConnectClient(MqttCallback mqttCallback) throws MqttException {
mqttClient = getMqttClient(broker, clientID, persistence);
mqttClient.setCallback(mqttCallback);
mqttClient.connect(connOpts);
}
}

View File

@ -15,15 +15,43 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.mqtt; package org.apache.nifi.processors.mqtt.common;
public class MQTTQueueMessage import org.eclipse.paho.client.mqttv3.MqttMessage;
{
public String topic;
public byte[] message;
public MQTTQueueMessage(String topic, byte[] message) { public class MQTTQueueMessage {
private String topic;
private byte[] payload;
private int qos = 1;
private boolean retained = false;
private boolean duplicate = false;
public MQTTQueueMessage(String topic, MqttMessage message) {
this.topic = topic; this.topic = topic;
this.message = message; payload = message.getPayload();
qos = message.getQos();
retained = message.isRetained();
duplicate = message.isDuplicate();
}
public String getTopic() {
return topic;
}
public byte[] getPayload() {
return payload;
}
public int getQos() {
return qos;
}
public boolean isRetained() {
return retained;
}
public boolean isDuplicate() {
return duplicate;
} }
} }

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.components.AllowableValue;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
public class MqttConstants {
/*
------------------------------------------
Clean Session Values
------------------------------------------
*/
public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_TRUE =
new AllowableValue("true", "Clean Session", "Client and Server discard any previous session and start a new " +
"one. This session lasts as long as the network connection. " +
"State data associated with this session is not reused in any subsequent session");
public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_FALSE =
new AllowableValue("false", "Resume Session", "Server resumes communications with the client based on state from " +
"the current session (as identified by the ClientID). The client and server store the session after " +
"the client and server are disconnected. After the disconnection of a session that was not a clean session, " +
"the server stores further QoS 1 and QoS 2 messages that match any subscriptions that the client had at " +
"the time of disconnection as part of the session state");
/*
------------------------------------------
QoS Values
------------------------------------------
*/
public static final AllowableValue ALLOWABLE_VALUE_QOS_0 =
new AllowableValue("0", "0 - At most once", "Best effort delivery. A message wont be acknowledged by the receiver or stored and redelivered by the sender. " +
"This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol.");
public static final AllowableValue ALLOWABLE_VALUE_QOS_1 =
new AllowableValue("1", "1 - At least once", "Guarantees that a message will be delivered at least once to the receiver. " +
"The message can also be delivered more than once");
public static final AllowableValue ALLOWABLE_VALUE_QOS_2 =
new AllowableValue("2", "2 - Exactly once", "Guarantees that each message is received only once by the counterpart. It is the safest and also " +
"the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver.");
/*
------------------------------------------
MQTT Version Values
------------------------------------------
*/
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
"AUTO",
"Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
"v3.1.1");
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
"v3.1.0");
}

View File

@ -12,5 +12,5 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.mqtt.GetMQTT org.apache.nifi.processors.mqtt.ConsumeMQTT
org.apache.nifi.processors.mqtt.PutMQTT org.apache.nifi.processors.mqtt.PublishMQTT

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import io.moquette.proto.messages.PublishMessage;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
public class TestConsumeMQTT extends TestConsumeMqttCommon {
public MqttTestClient mqttTestClient;
public class UnitTestableConsumeMqtt extends ConsumeMQTT {
public UnitTestableConsumeMqtt(){
super();
}
@Override
public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
return mqttTestClient;
}
}
@Before
public void init() throws IOException {
PUBLISH_WAIT_MS = 0;
broker = "tcp://localhost:1883";
UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Override
public void internalPublish(PublishMessage publishMessage) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(publishMessage.getPayload().array());
mqttMessage.setRetained(publishMessage.isRetainFlag());
mqttMessage.setQos(publishMessage.getQos().ordinal());
try {
mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage);
} catch (MqttException e) {
Assert.fail("Should never get an MqttException when publishing using test client");
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processors.mqtt.GetMQTT;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestGetMQTT {
private TestRunner testRunner;
@Before
public void init() {
testRunner = TestRunners.newTestRunner(GetMQTT.class);
}
@Test
public void testProcessor() {
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class TestPublishMQTT extends TestPublishMqttCommon {
@Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage;
assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload()));
assertEquals(qos, mqttQueueMessage.getQos());
assertEquals(retain, mqttQueueMessage.isRetained());
assertEquals(topic, mqttQueueMessage.getTopic());
}
public MqttTestClient mqttTestClient;
public class UnitTestablePublishMqtt extends PublishMQTT {
public UnitTestablePublishMqtt(){
super();
}
@Override
public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher);
return mqttTestClient;
}
}
@Before
public void init() throws IOException {
UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
topic = "testTopic";
testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
}
@After
public void tearDown() throws Exception {
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processors.mqtt.PutMQTT;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
public class TestPutMQTT {
private TestRunner testRunner;
@Before
public void init() {
testRunner = TestRunners.newTestRunner(PutMQTT.class);
}
@Test
public void testProcessor() {
}
}

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import java.util.concurrent.atomic.AtomicBoolean;
public class MqttTestClient implements IMqttClient {
public String serverURI;
public String clientId;
public AtomicBoolean connected = new AtomicBoolean(false);
public MqttCallback mqttCallback;
public ConnectType type;
public enum ConnectType {Publisher, Subscriber}
public MQTTQueueMessage publishedMessage;
public String subscribedTopic;
public int subscribedQos;
public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException {
this.serverURI = serverURI;
this.clientId = clientId;
this.type = type;
}
@Override
public void connect() throws MqttSecurityException, MqttException {
connected.set(true);
}
@Override
public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
connected.set(true);
}
@Override
public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException {
return null;
}
@Override
public void disconnect() throws MqttException {
connected.set(false);
}
@Override
public void disconnect(long quiesceTimeout) throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly() throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly(long disconnectTimeout) throws MqttException {
connected.set(false);
}
@Override
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
connected.set(false);
}
@Override
public void subscribe(String topicFilter) throws MqttException, MqttSecurityException {
subscribedTopic = topicFilter;
subscribedQos = -1;
}
@Override
public void subscribe(String[] topicFilters) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void subscribe(String topicFilter, int qos) throws MqttException {
subscribedTopic = topicFilter;
subscribedQos = qos;
}
@Override
public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void unsubscribe(String topicFilter) throws MqttException {
subscribedTopic = "";
subscribedQos = -2;
}
@Override
public void unsubscribe(String[] topicFilters) throws MqttException {
throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
switch (type) {
case Publisher:
publishedMessage = new MQTTQueueMessage(topic, message);
break;
case Subscriber:
try {
mqttCallback.messageArrived(topic, message);
} catch (Exception e) {
throw new MqttException(e);
}
break;
}
}
@Override
public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException {
switch (type) {
case Publisher:
publishedMessage = new MQTTQueueMessage(topic, message);
break;
case Subscriber:
try {
mqttCallback.messageArrived(topic, message);
} catch (Exception e) {
throw new MqttException(e);
}
break;
}
}
@Override
public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
}
@Override
public MqttTopic getTopic(String topic) {
return null;
}
@Override
public boolean isConnected() {
return connected.get();
}
@Override
public String getClientId() {
return clientId;
}
@Override
public String getServerURI() {
return serverURI;
}
@Override
public IMqttDeliveryToken[] getPendingDeliveryTokens() {
return new IMqttDeliveryToken[0];
}
@Override
public void close() throws MqttException {
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.ssl.StandardSSLContextService;
import java.util.HashMap;
import java.util.Map;
public class MqttTestUtils {
public static Map<String, String> createSslProperties() {
final Map<String, String> map = new HashMap<>();
map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks");
map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest");
map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS");
map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks");
map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest");
map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS");
return map;
}
}

View File

@ -0,0 +1,391 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.Assert.assertTrue;
public abstract class TestConsumeMqttCommon {
public int PUBLISH_WAIT_MS = 1000;
public Server MQTT_server;
public TestRunner testRunner;
public String broker;
public abstract void internalPublish(PublishMessage publishMessage);
@Test
public void testLastWillConfig() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
testRunner.assertValid();
}
@Test
public void testQoS2() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS2NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS0() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() < 2);
if(flowFiles.size() == 1) {
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}
@Test
public void testOnStoppedFinish() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
MqttMessage innerMessage = new MqttMessage();
innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
innerMessage.setQos(2);
MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testResizeBuffer() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
testRunner.assertValid();
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(false);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
internalPublish(testMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
testRunner.assertValid();
testRunner.run(1);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true);
IMqttClient mqttClient = (IMqttClient) f.get(processor);
return mqttClient.isConnected();
}
public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect");
method.setAccessible(true);
method.invoke(processor);
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import io.moquette.server.Server;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.util.TestRunner;
import org.junit.Test;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
public abstract class TestPublishMqttCommon {
public Server MQTT_server;
public TestRunner testRunner;
public String topic;
public abstract void verifyPublishedMessage(byte[] payload, int qos, boolean retain);
@Test
public void testQoS0() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 0, false);
}
@Test
public void testQoS1() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 1, false);
}
@Test
public void testQoS2NotCleanSession() {
// Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testQoS2() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testRetainQoS2() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, true);
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.integration;
import io.moquette.BrokerConstants;
import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Properties;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
public class TestConsumeMQTT extends TestConsumeMqttCommon {
private void startServer() throws IOException {
MQTT_server = new Server();
final Properties configProps = new Properties();
configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
IConfig server_config = new MemoryConfig(configProps);
MQTT_server.startServer(server_config);
}
@Before
public void init() throws IOException {
startServer();
broker = "tcp://localhost:1883";
testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Test
public void testRetainedQoS2() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(true);
internalPublish(testMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
}
@Override
public void internalPublish(PublishMessage publishMessage) {
MQTT_server.internalPublish(publishMessage);
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.integration;
import io.moquette.BrokerConstants;
import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties;
public class TestConsumeMqttSSL extends TestConsumeMqttCommon {
private void startServer() throws IOException {
MQTT_server = new Server();
final Properties configProps = new Properties();
configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks");
configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest");
configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest");
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
IConfig server_config = new MemoryConfig(configProps);
MQTT_server.startServer(server_config);
}
@Before
public void init() throws IOException, InitializationException {
startServer();
broker = "ssl://localhost:8883";
testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
final StandardSSLContextService sslService = new StandardSSLContextService();
Map<String, String> sslProperties = createSslProperties();
testRunner.addControllerService("ssl-context", sslService, sslProperties);
testRunner.enableControllerService(sslService);
testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Test
public void testRetainedQoS2() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
testMessage.setRetainFlag(true);
internalPublish(testMessage);
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true");
}
@Override
public void internalPublish(PublishMessage publishMessage) {
MQTT_server.internalPublish(publishMessage);
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.integration;
import io.moquette.BrokerConstants;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Properties;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect;
public class TestPublishAndSubscribeMqttIntegration {
private TestRunner testSubscribeRunner;
private TestRunner testPublishRunner;
private Server MQTT_server;
private static int PUBLISH_WAIT_MS = 1000;
private void startServer() throws IOException {
MQTT_server = new Server();
final Properties configProps = new Properties();
configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
IConfig server_config = new MemoryConfig(configProps);
MQTT_server.startServer(server_config);
}
@Before
public void init() throws IOException {
startServer();
testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class);
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Test
public void testBasic() throws Exception {
subscribe();
publishAndVerify();
Thread.sleep(PUBLISH_WAIT_MS);
testSubscribeRunner.run();
subscribeVerify();
}
private void publishAndVerify(){
testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestPublishClient");
testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
testPublishRunner.assertValid();
String testMessage = "testMessage";
testPublishRunner.enqueue(testMessage.getBytes());
testPublishRunner.run();
testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testPublishRunner.assertTransferCount(REL_SUCCESS, 1);
}
private void subscribe() throws IOException, ClassNotFoundException, MqttException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException {
testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestSubscribeClient");
testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
testSubscribeRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor();
consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
reconnect(consumeMQTT);
}
private void subscribeVerify(){
testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, "tcp://localhost:1883");
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.integration;
import io.moquette.BrokerConstants;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Properties;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
public class TestPublishMQTT extends TestPublishMqttCommon {
private void startServer() throws IOException {
MQTT_server = new Server();
final Properties configProps = new Properties();
configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
IConfig server_config = new MemoryConfig(configProps);
MQTT_server.startServer(server_config);
}
@Before
public void init() throws IOException {
startServer();
testRunner = TestRunners.newTestRunner(PublishMQTT.class);
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles( new FilenameFilter() {
@Override
public boolean accept( final File dir,
final String name ) {
return name.matches( "moquette_store.mapdb.*" );
}
} );
for ( final File file : files ) {
if ( !file.delete() ) {
System.err.println( "Can't remove " + file.getAbsolutePath() );
}
}
}
@Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
//Cannot verify published message without subscribing and consuming it which is outside the scope of this test.
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.integration;
import io.moquette.BrokerConstants;
import io.moquette.server.Server;
import io.moquette.server.config.IConfig;
import io.moquette.server.config.MemoryConfig;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Before;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties;
public class TestPublishMqttSSL extends TestPublishMqttCommon {
private void startServer() throws IOException {
MQTT_server = new Server();
final Properties configProps = new Properties();
configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884");
configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883");
configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks");
configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest");
configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest");
configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb");
IConfig server_config = new MemoryConfig(configProps);
MQTT_server.startServer(server_config);
}
@Before
public void init() throws IOException, InitializationException {
startServer();
testRunner = TestRunners.newTestRunner(PublishMQTT.class);
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "ssl://localhost:8883");
testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic");
final StandardSSLContextService sslService = new StandardSSLContextService();
Map<String, String> sslProperties = createSslProperties();
testRunner.addControllerService("ssl-context", sslService, sslProperties);
testRunner.enableControllerService(sslService);
testRunner.setProperty(PublishMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
}
@After
public void tearDown() throws Exception {
if (MQTT_server != null) {
MQTT_server.stopServer();
}
final File folder = new File("./target");
final File[] files = folder.listFiles(new FilenameFilter() {
@Override
public boolean accept(final File dir,
final String name) {
return name.matches("moquette_store.mapdb.*");
}
});
for (final File file : files) {
if (!file.delete()) {
System.err.println("Can't remove " + file.getAbsolutePath());
}
}
}
@Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
//Cannot verify published message without subscribing and consuming it which is outside the scope of this test.
}
}