diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml index 158e80fd7a..9ba9dd3197 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/pom.xml @@ -27,6 +27,11 @@ true + + org.apache.nifi + nifi-standard-services-api-nar + nar + org.apache.nifi nifi-mqtt-processors diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml index 9f4e9563d2..9561517f36 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml @@ -71,16 +71,40 @@ javax.websocket javax.websocket-api 1.1 - - - org.glassfish.tyrus.bundles - tyrus-standalone-client-jdk - 1.12 - org.glassfish.tyrus - tyrus-container-grizzly-client - 1.12 + org.apache.nifi + nifi-ssl-context-service-api + + + io.moquette + moquette-broker + 0.8.1 + test + + + org.apache.nifi + nifi-ssl-context-service + test + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + **/integration/TestConsumeMQTT.java + **/integration/TestConsumeMqttSSL.java + **/integration/TestPublishAndSubscribeMqttIntegration.java + **/integration/TestPublishMQTT.java + **/integration/TestPublishMqttSSL.java + + + + + diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java new file mode 100644 index 0000000000..aa87381cd8 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; + +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.io.OutputStream; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2; + + +@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection +@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") +@SeeAlso({PublishMQTT.class}) +@WritesAttributes({ + @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"), + @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"), + @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."), + @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."), + @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " + + "on the topic.")}) +public class ConsumeMQTT extends AbstractMQTTProcessor { + + public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; + public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; + public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos"; + public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate"; + public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained"; + + public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder() + .name("Topic Filter") + .description("The MQTT topic filter to designate the topics to subscribe to.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder() + .name("Quality of Service(QoS)") + .description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.") + .required(true) + .defaultValue(ALLOWABLE_VALUE_QOS_0.getValue()) + .allowableValues( + ALLOWABLE_VALUE_QOS_0, + ALLOWABLE_VALUE_QOS_1, + ALLOWABLE_VALUE_QOS_2) + .build(); + + public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("Max Queue Size") + .description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this " + + "processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + + private static int DISCONNECT_TIMEOUT = 5000; + private volatile long maxQueueSize; + + private volatile int qos; + private volatile String topicFilter; + private final AtomicBoolean scheduled = new AtomicBoolean(false); + + private volatile LinkedBlockingQueue mqttQueue; + + public static final Relationship REL_MESSAGE = new Relationship.Builder() + .name("Message") + .description("The MQTT message output") + .build(); + + private static final List descriptors; + private static final Set relationships; + + static{ + final List innerDescriptorsList = getAbstractPropertyDescriptors(); + innerDescriptorsList.add(PROP_TOPIC_FILTER); + innerDescriptorsList.add(PROP_QOS); + innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE); + descriptors = Collections.unmodifiableList(innerDescriptorsList); + + final Set innerRelationshipsSet = new HashSet(); + innerRelationshipsSet.add(REL_MESSAGE); + relationships = Collections.unmodifiableSet(innerRelationshipsSet); + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + // resize the receive buffer, but preserve data + if (descriptor == PROP_MAX_QUEUE_SIZE) { + // it's a mandatory integer, never null + int newSize = Integer.valueOf(newValue); + if (mqttQueue != null) { + int msgPending = mqttQueue.size(); + if (msgPending > newSize) { + logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", + new Object[]{newSize, msgPending}); + return; + } + LinkedBlockingQueue newBuffer = new LinkedBlockingQueue<>(newSize); + mqttQueue.drainTo(newBuffer); + mqttQueue = newBuffer; + } + + } + } + + @Override + public Collection customValidate(ValidationContext context) { + final Collection results = super.customValidate(context); + int newSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger(); + if (mqttQueue == null) { + mqttQueue = new LinkedBlockingQueue<>(context.getProperty(PROP_MAX_QUEUE_SIZE).asInteger()); + } + int msgPending = mqttQueue.size(); + if (msgPending > newSize) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject("ConsumeMQTT Configuration") + .explanation(String.format("%s (%d) is smaller than the number of messages pending (%d).", + PROP_MAX_QUEUE_SIZE.getDisplayName(), newSize, msgPending)) + .build()); + } + + return results; + } + + @Override + protected void init(final ProcessorInitializationContext context) { + logger = getLogger(); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException { + qos = context.getProperty(PROP_QOS).asInteger(); + maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong(); + topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue(); + + buildClient(context); + scheduled.set(true); + } + + @OnUnscheduled + public void onUnscheduled(final ProcessContext context) { + scheduled.set(false); + + mqttClientConnectLock.writeLock().lock(); + try { + if(isConnected()) { + mqttClient.disconnect(DISCONNECT_TIMEOUT); + logger.info("Disconnected the MQTT client."); + } + } catch(MqttException me) { + logger.error("Failed when disconnecting the MQTT client.", me); + } finally { + mqttClientConnectLock.writeLock().unlock(); + } + } + + + @OnStopped + public void onStopped(final ProcessContext context) throws IOException { + if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { + logger.info("Finishing processing leftover messages"); + ProcessSession session = processSessionFactory.createSession(); + transferQueue(session); + } else { + if (mqttQueue!= null && !mqttQueue.isEmpty()){ + throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " + + "clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " + + "in the MQTT internal queue cannot finish processing until until the processor is triggered to run."); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){ + logger.info("Queue is empty and client is not connected. Attempting to reconnect."); + + try { + reconnect(); + } catch (MqttException e) { + logger.error("Connection to " + broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e); + context.yield(); + } + } + + if (mqttQueue.isEmpty()) { + return; + } + + transferQueue(session); + } + + private void transferQueue(ProcessSession session){ + while (!mqttQueue.isEmpty()) { + FlowFile messageFlowfile = session.create(); + final MQTTQueueMessage mqttMessage = mqttQueue.peek(); + + Map attrs = new HashMap<>(); + attrs.put(BROKER_ATTRIBUTE_KEY, broker); + attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic()); + attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos())); + attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate())); + attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained())); + + messageFlowfile = session.putAllAttributes(messageFlowfile, attrs); + + messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(mqttMessage.getPayload()); + } + }); + + String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); + session.getProvenanceReporter().receive(messageFlowfile, transitUri); + session.transfer(messageFlowfile, REL_MESSAGE); + mqttQueue.remove(mqttMessage); + session.commit(); + } + } + + private class ConsumeMQTTCallback implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + logger.warn("Connection to " + broker + " lost", cause); + try { + reconnect(); + } catch (MqttException e) { + logger.error("Connection to " + broker + " lost and callback re-connect failed."); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + if (logger.isInfoEnabled()) { + logger.info("MQTT message arrived on topic:" + topic); + } + + if (mqttQueue.size() >= maxQueueSize){ + throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); + } else { + mqttQueue.add(new MQTTQueueMessage(topic, message)); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token); + } + } + + private void reconnect() throws MqttException { + mqttClientConnectLock.writeLock().lock(); + try { + if (!mqttClient.isConnected()) { + setAndConnectClient(new ConsumeMQTTCallback()); + mqttClient.subscribe(topicFilter, qos); + } + } finally { + mqttClientConnectLock.writeLock().unlock(); + } + } + + private boolean isConnected(){ + return (mqttClient != null && mqttClient.isConnected()); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java deleted file mode 100644 index 1391317b3d..0000000000 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/GetMQTT.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.mqtt; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.*; -import org.apache.nifi.annotation.behavior.ReadsAttribute; -import org.apache.nifi.annotation.behavior.ReadsAttributes; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.OutputStreamCallback; - -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.io.OutputStream; -import java.io.IOException; - -@Tags({"GetMQTT"}) -@CapabilityDescription("Gets messages from an MQTT broker") -@SeeAlso({}) -@ReadsAttributes({@ReadsAttribute(attribute="", description="")}) -@WritesAttributes({@WritesAttribute(attribute="broker", description="MQTT broker that was the message source"), - @WritesAttribute(attribute="topic", description="MQTT topic on which message was received")}) -public class GetMQTT extends AbstractProcessor implements MqttCallback { - - String topic; - String broker; - String clientID; - double lastTime; - boolean firstTime = true; - - MemoryPersistence persistence = new MemoryPersistence(); - MqttClient mqttClient; - - LinkedBlockingQueue mqttQueue = new LinkedBlockingQueue<>(); - - public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor - .Builder().name("Broker address") - .description("MQTT broker address (e.g. tcp://localhost:1883)") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor PROPERTY_MQTT_TOPIC = new PropertyDescriptor - .Builder().name("MQTT topic") - .description("MQTT topic to subscribe to") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor - .Builder().name("MQTT client ID") - .description("MQTT client ID to use") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final Relationship RELATIONSHIP_MQTTMESSAGE = new Relationship.Builder() - .name("MQTTMessage") - .description("MQTT message output") - .build(); - - private List descriptors; - - private Set relationships; - - @Override - public void connectionLost(Throwable t) { - getLogger().info("Connection to " + broker + " lost"); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - mqttQueue.add(new MQTTQueueMessage(topic, message.getPayload())); - } - - @Override - protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList(); - descriptors.add(PROPERTY_BROKER_ADDRESS); - descriptors.add(PROPERTY_MQTT_TOPIC); - descriptors.add(PROPERTY_MQTT_CLIENTID); - - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet(); - relationships.add(RELATIONSHIP_MQTTMESSAGE); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return this.relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return descriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - try { - broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); - topic = context.getProperty(PROPERTY_MQTT_TOPIC).getValue(); - clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); - mqttClient = new MqttClient(broker, clientID, persistence); - MqttConnectOptions connOpts = new MqttConnectOptions(); - mqttClient.setCallback(this); - connOpts.setCleanSession(true); - getLogger().info("Connecting to broker: " + broker); - mqttClient.connect(connOpts); - mqttClient.subscribe(topic, 0); - } catch(MqttException me) { - getLogger().error("msg "+me.getMessage()); - } - } - - @OnUnscheduled - public void onUnscheduled(final ProcessContext context) { - try { - mqttClient.disconnect(); - } catch(MqttException me) { - - } - getLogger().error("Disconnected"); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List messageList = new LinkedList(); - - mqttQueue.drainTo(messageList); - if (messageList.isEmpty()) - return; - - Iterator iterator = messageList.iterator(); - while (iterator.hasNext()) { - FlowFile messageFlowfile = session.create(); - final MQTTQueueMessage m = (MQTTQueueMessage)iterator.next(); - - messageFlowfile = session.putAttribute(messageFlowfile, "broker", broker); - messageFlowfile = session.putAttribute(messageFlowfile, "topic", topic); - messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { - - @Override - public void process(final OutputStream out) throws IOException { - out.write(m.message); - } - }); - session.transfer(messageFlowfile, RELATIONSHIP_MQTTMESSAGE); - session.commit(); - } - } -} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java new file mode 100644 index 0000000000..95bbde4d4f --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; + +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; +import org.apache.nifi.stream.io.StreamUtils; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.io.InputStream; +import java.io.IOException; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"publish", "MQTT", "IOT"}) +@CapabilityDescription("Publishes a message to an MQTT topic") +@SeeAlso({ConsumeMQTT.class}) +public class PublishMQTT extends AbstractMQTTProcessor { + + public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder() + .name("Topic") + .description("The topic to publish the message to.") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder() + .name("Quality of Service(QoS)") + .description("The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'. " + + "Expression language is allowed in order to support publishing messages with different QoS but the end value of the property must be either '0', '1' or '2'. ") + .required(true) + .expressionLanguageSupported(true) + .addValidator(QOS_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_RETAIN = new PropertyDescriptor.Builder() + .name("Retain Message") + .description("Whether or not the retain flag should be set on the MQTT message.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(RETAIN_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are sent successfully to the destination are transferred to this relationship.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles that failed to send to the destination are transferred to this relationship.") + .build(); + + private static final List descriptors; + private static final Set relationships; + + static { + final List innerDescriptorsList = getAbstractPropertyDescriptors(); + innerDescriptorsList.add(PROP_TOPIC); + innerDescriptorsList.add(PROP_QOS); + innerDescriptorsList.add(PROP_RETAIN); + descriptors = Collections.unmodifiableList(innerDescriptorsList); + + final Set innerRelationshipsSet = new HashSet<>(); + innerRelationshipsSet.add(REL_SUCCESS); + innerRelationshipsSet.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(innerRelationshipsSet); + } + + @Override + protected void init(final ProcessorInitializationContext context) { + logger = getLogger(); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + buildClient(context); + } + + @OnStopped + public void onStop(final ProcessContext context) { + mqttClientConnectLock.writeLock().lock(); + try { + if (mqttClient != null && mqttClient.isConnected()) { + mqttClient.disconnect(); + logger.info("Disconnected the MQTT client."); + } + } catch(MqttException me) { + logger.error("Failed when disconnecting the MQTT client.", me); + } finally { + mqttClientConnectLock.writeLock().unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowfile = session.get(); + if (flowfile == null) { + return; + } + + if(mqttClient == null || !mqttClient.isConnected()){ + logger.info("Was disconnected from client or was never connected, attempting to connect."); + try { + reconnect(); + } catch (MqttException e) { + context.yield(); + session.transfer(flowfile, REL_FAILURE); + logger.error("MQTT client is disconnected and re-connecting failed. Transferring FlowFile to fail and yielding", e); + return; + } + } + + // get the MQTT topic + String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue(); + + if (topic == null || topic.isEmpty()) { + logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure"); + session.transfer(flowfile, REL_FAILURE); + return; + } + + // do the read + final byte[] messageContent = new byte[(int) flowfile.getSize()]; + session.read(flowfile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, messageContent, true); + } + }); + + int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger(); + final MqttMessage mqttMessage = new MqttMessage(messageContent); + mqttMessage.setQos(qos); + mqttMessage.setPayload(messageContent); + mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean()); + + try { + mqttClientConnectLock.readLock().lock(); + try { + /* + * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously: + * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait()); + */ + mqttClient.publish(topic, mqttMessage); + } finally { + mqttClientConnectLock.readLock().unlock(); + } + session.transfer(flowfile, REL_SUCCESS); + } catch(MqttException me) { + logger.error("Failed to publish message.", me); + session.transfer(flowfile, REL_FAILURE); + } + } + + private class PublishMQTTCallback implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + logger.warn("Connection to " + broker + " lost", cause); + try { + reconnect(); + } catch (MqttException e) { + logger.error("Connection to " + broker + " lost and re-connect failed"); + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}"); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. + logger.trace("Received 'delivery complete' message from broker for:" + token.toString()); + } + } + + + private void reconnect() throws MqttException { + mqttClientConnectLock.writeLock().lock(); + try { + if (!mqttClient.isConnected()) { + setAndConnectClient(new PublishMQTTCallback()); + getLogger().info("Connecting to broker: " + broker); + } + } finally { + mqttClientConnectLock.writeLock().unlock(); + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java deleted file mode 100644 index 29aeb10617..0000000000 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PutMQTT.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.mqtt; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.*; -import org.apache.nifi.annotation.behavior.ReadsAttribute; -import org.apache.nifi.annotation.behavior.ReadsAttributes; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnUnscheduled; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.commons.io.IOUtils; - -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; - -import java.util.*; -import java.util.concurrent.atomic.AtomicReference; -import java.io.InputStream; -import java.io.IOException; - -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"PutMQTT"}) -@CapabilityDescription("Publishes message to an MQTT topic") -@SeeAlso({}) -@ReadsAttributes({@ReadsAttribute(attribute="topic", description="Topic to publish message to")}) -@WritesAttributes({@WritesAttribute(attribute="", description="")}) -public class PutMQTT extends AbstractProcessor implements MqttCallback { - - String broker; - String clientID; - - MemoryPersistence persistence = new MemoryPersistence(); - MqttClient mqttClient; - - public static final PropertyDescriptor PROPERTY_BROKER_ADDRESS = new PropertyDescriptor - .Builder().name("Broker address") - .description("MQTT broker address (e.g. tcp://localhost:1883)") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor PROPERTY_MQTT_CLIENTID = new PropertyDescriptor - .Builder().name("MQTT client ID") - .description("MQTT client ID to use") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - private List descriptors; - - private Set relationships; - - @Override - public void connectionLost(Throwable t) { - getLogger().info("Connection to " + broker + " lost"); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - } - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { - } - - @Override - protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList(); - descriptors.add(PROPERTY_BROKER_ADDRESS); - descriptors.add(PROPERTY_MQTT_CLIENTID); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet(); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return this.relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return descriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - try { - broker = context.getProperty(PROPERTY_BROKER_ADDRESS).getValue(); - clientID = context.getProperty(PROPERTY_MQTT_CLIENTID).getValue(); - mqttClient = new MqttClient(broker, clientID, persistence); - MqttConnectOptions connOpts = new MqttConnectOptions(); - mqttClient.setCallback(this); - connOpts.setCleanSession(true); - getLogger().info("Connecting to broker: " + broker); - mqttClient.connect(connOpts); - } catch(MqttException me) { - getLogger().error("msg "+me.getMessage()); - } - } - - @OnUnscheduled - public void onUnscheduled(final ProcessContext context) { - try { - mqttClient.disconnect(); - } catch(MqttException me) { - - } - getLogger().error("Disconnected"); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final AtomicReference message = new AtomicReference<>(); - - FlowFile flowfile = session.get(); - message.set(""); - - // get the MQTT topic - - String topic = flowfile.getAttribute("topic"); - - if (topic == null) { - getLogger().error("No topic attribute on flowfile"); - session.remove(flowfile); - return; - } - - // do the read - - session.read(flowfile, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - try{ - message.set(IOUtils.toString(in)); - }catch(Exception e){ - getLogger().error("Failed to read flowfile " + e.getMessage()); - } - } - }); - try { - session.remove(flowfile); - } catch (Exception e) { - getLogger().error("Failed to remove flowfile " + e.getMessage()); - return; - } - - String output = message.get(); - - if ((output == null) || output.isEmpty()) { - return; - } - - try { - mqttClient.publish(topic, output.getBytes(), 0, false); - } catch(MqttException me) { - getLogger().error("msg "+me.getMessage()); - } - } -} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java new file mode 100644 index 0000000000..733c240ea1 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_2; + +public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor { + + protected ComponentLog logger; + protected IMqttClient mqttClient; + protected final ReadWriteLock mqttClientConnectLock = new ReentrantReadWriteLock(true); + protected volatile String broker; + protected volatile String clientID; + protected MqttConnectOptions connOpts; + protected MemoryPersistence persistence = new MemoryPersistence(); + + public ProcessSessionFactory processSessionFactory; + + public static final Validator QOS_VALIDATOR = new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + Integer inputInt = Integer.parseInt(input); + if (inputInt < 0 || inputInt > 2) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build(); + } + return new ValidationResult.Builder().subject(subject).valid(true).build(); + } + }; + + public static final Validator BROKER_VALIDATOR = new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + try{ + URI brokerURI = new URI(input); + if (!"".equals(brokerURI.getPath())) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build(); + } + if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()))) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp' and 'ssl' schemes are supported.").build(); + } + } catch (URISyntaxException e) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build(); + } + return new ValidationResult.Builder().subject(subject).valid(true).build(); + } + }; + + public static final Validator RETAIN_VALIDATOR = new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){ + return new ValidationResult.Builder().subject(subject).valid(true).build(); + } else{ + return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false) + .validate(subject, input, context); + } + + } + }; + + public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder() + .name("Broker URI") + .description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp' and 'ssl' schemes are supported. In order to use 'ssl', the SSL Context " + + "Service property must be set.") + .required(true) + .addValidator(BROKER_VALIDATOR) + .build(); + + + public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder() + .name("Client ID") + .description("MQTT client ID to use") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username to use when connecting to the broker") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password to use when connecting to the broker") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + + public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder() + .name("Last Will Topic") + .description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_LAST_WILL_MESSAGE = new PropertyDescriptor.Builder() + .name("Last Will Message") + .description("The message to send as the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.") + .required(false) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_LAST_WILL_RETAIN = new PropertyDescriptor.Builder() + .name("Last Will Retain") + .description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.") + .required(false) + .allowableValues("true","false") + .build(); + + public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder() + .name("Last Will QoS Level") + .description("QoS level to be used when publishing the Last Will Message") + .required(false) + .allowableValues( + ALLOWABLE_VALUE_QOS_0, + ALLOWABLE_VALUE_QOS_1, + ALLOWABLE_VALUE_QOS_2 + ) + .build(); + + public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder() + .name("Session state") + .description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.") + .required(true) + .allowableValues( + ALLOWABLE_VALUE_CLEAN_SESSION_TRUE, + ALLOWABLE_VALUE_CLEAN_SESSION_FALSE + ) + .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue()) + .build(); + + public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder() + .name("MQTT Specification Version") + .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.") + .allowableValues( + ALLOWABLE_VALUE_MQTT_VERSION_AUTO, + ALLOWABLE_VALUE_MQTT_VERSION_311, + ALLOWABLE_VALUE_MQTT_VERSION_310 + ) + .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue()) + .required(true) + .build(); + + public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout (seconds)") + .description("Maximum time interval the client will wait for the network connection to the MQTT server " + + "to be established. The default timeout is 30 seconds. " + + "A value of 0 disables timeout processing meaning the client will wait until the network connection is made successfully or fails.") + .required(false) + .defaultValue("30") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_KEEP_ALIVE_INTERVAL = new PropertyDescriptor.Builder() + .name("Keep Alive Interval (seconds)") + .description("Defines the maximum time interval between messages sent or received. It enables the " + + "client to detect if the server is no longer available, without having to wait for the TCP/IP timeout. " + + "The client will ensure that at least one message travels across the network within each keep alive period. In the absence of a data-related message during the time period, " + + "the client sends a very small \"ping\" message, which the server will acknowledge. A value of 0 disables keepalive processing in the client.") + .required(false) + .defaultValue("60") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static List getAbstractPropertyDescriptors(){ + final List descriptors = new ArrayList(); + descriptors.add(PROP_BROKER_URI); + descriptors.add(PROP_CLIENTID); + descriptors.add(PROP_USERNAME); + descriptors.add(PROP_PASSWORD); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(PROP_LAST_WILL_TOPIC); + descriptors.add(PROP_LAST_WILL_MESSAGE); + descriptors.add(PROP_LAST_WILL_RETAIN); + descriptors.add(PROP_LAST_WILL_QOS); + descriptors.add(PROP_CLEAN_SESSION); + descriptors.add(PROP_MQTT_VERSION); + descriptors.add(PROP_CONN_TIMEOUT); + descriptors.add(PROP_KEEP_ALIVE_INTERVAL); + return descriptors; + } + + @Override + public Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(1); + final boolean usernameSet = validationContext.getProperty(PROP_USERNAME).isSet(); + final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet(); + + if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) { + results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build()); + } + + final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet(); + final boolean lastWillMessageSet = validationContext.getProperty(PROP_LAST_WILL_MESSAGE).isSet(); + + final boolean lastWillRetainSet = validationContext.getProperty(PROP_LAST_WILL_RETAIN).isSet(); + final boolean lastWillQosSet = validationContext.getProperty(PROP_LAST_WILL_QOS).isSet(); + + // If any of the Last Will Properties are set + if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) { + // And any are not set + if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){ + // Then mark as invalid + results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " + + "set, all must be set.").build()); + } + } + + try { + URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).getValue()); + if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) { + results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " + + "the broker URI, the SSL Context Service must be set.").build()); + } + } catch (URISyntaxException e) { + results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it is not valid URI syntax.").build()); + } + + return results; + } + + public static Properties transformSSLContextService(SSLContextService sslContextService){ + Properties properties = new Properties(); + properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm()); + properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile()); + properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword()); + properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType()); + properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile()); + properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword()); + properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType()); + return properties; + } + + protected void buildClient(ProcessContext context){ + try { + broker = context.getProperty(PROP_BROKER_URI).getValue(); + clientID = context.getProperty(PROP_CLIENTID).getValue(); + + connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); + connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger()); + connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger()); + connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger()); + + PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE); + if (sslProp.isSet()) { + Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService()); + connOpts.setSSLProperties(sslProps); + } + + PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC); + if (lastWillTopicProp.isSet()){ + String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue(); + PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN); + Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger(); + connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false); + } + + + PropertyValue usernameProp = context.getProperty(PROP_USERNAME); + if(usernameProp.isSet()) { + connOpts.setUserName(usernameProp.getValue()); + connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray()); + } + + mqttClientConnectLock.writeLock().lock(); + try{ + mqttClient = getMqttClient(broker, clientID, persistence); + + } finally { + mqttClientConnectLock.writeLock().unlock(); + } + } catch(MqttException me) { + logger.error("Failed to initialize the connection to the " + me.getMessage()); + } + } + + protected IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + return new MqttClient(broker, clientID, persistence); + } + + + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + if (processSessionFactory == null) { + processSessionFactory = sessionFactory; + } + ProcessSession session = sessionFactory.createSession(); + try { + onTrigger(context, session); + session.commit(); + } catch (final Throwable t) { + getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); + session.rollback(true); + throw t; + } + } + + public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; + + // Caller should obtain the necessary lock + protected void setAndConnectClient(MqttCallback mqttCallback) throws MqttException { + mqttClient = getMqttClient(broker, clientID, persistence); + mqttClient.setCallback(mqttCallback); + mqttClient.connect(connOpts); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java similarity index 50% rename from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java rename to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java index 0874b10e8b..d5e63c789b 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/MQTTQueueMessage.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java @@ -15,15 +15,43 @@ * limitations under the License. */ -package org.apache.nifi.processors.mqtt; +package org.apache.nifi.processors.mqtt.common; -public class MQTTQueueMessage -{ - public String topic; - public byte[] message; - - public MQTTQueueMessage(String topic, byte[] message) { +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class MQTTQueueMessage { + private String topic; + + private byte[] payload; + private int qos = 1; + private boolean retained = false; + private boolean duplicate = false; + + public MQTTQueueMessage(String topic, MqttMessage message) { this.topic = topic; - this.message = message; + payload = message.getPayload(); + qos = message.getQos(); + retained = message.isRetained(); + duplicate = message.isDuplicate(); + } + + public String getTopic() { + return topic; + } + + public byte[] getPayload() { + return payload; + } + + public int getQos() { + return qos; + } + + public boolean isRetained() { + return retained; + } + + public boolean isDuplicate() { + return duplicate; } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java new file mode 100644 index 0000000000..a29e6ff616 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.components.AllowableValue; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; + +public class MqttConstants { + + /* + ------------------------------------------ + Clean Session Values + ------------------------------------------ + */ + + public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_TRUE = + new AllowableValue("true", "Clean Session", "Client and Server discard any previous session and start a new " + + "one. This session lasts as long as the network connection. " + + "State data associated with this session is not reused in any subsequent session"); + + public static final AllowableValue ALLOWABLE_VALUE_CLEAN_SESSION_FALSE = + new AllowableValue("false", "Resume Session", "Server resumes communications with the client based on state from " + + "the current session (as identified by the ClientID). The client and server store the session after " + + "the client and server are disconnected. After the disconnection of a session that was not a clean session, " + + "the server stores further QoS 1 and QoS 2 messages that match any subscriptions that the client had at " + + "the time of disconnection as part of the session state"); + + /* + ------------------------------------------ + QoS Values + ------------------------------------------ + */ + + + public static final AllowableValue ALLOWABLE_VALUE_QOS_0 = + new AllowableValue("0", "0 - At most once", "Best effort delivery. A message won’t be acknowledged by the receiver or stored and redelivered by the sender. " + + "This is often called “fire and forget” and provides the same guarantee as the underlying TCP protocol."); + + public static final AllowableValue ALLOWABLE_VALUE_QOS_1 = + new AllowableValue("1", "1 - At least once", "Guarantees that a message will be delivered at least once to the receiver. " + + "The message can also be delivered more than once"); + + public static final AllowableValue ALLOWABLE_VALUE_QOS_2 = + new AllowableValue("2", "2 - Exactly once", "Guarantees that each message is received only once by the counterpart. It is the safest and also " + + "the slowest quality of service level. The guarantee is provided by two round-trip flows between sender and receiver."); + + + /* + ------------------------------------------ + MQTT Version Values + ------------------------------------------ + */ + public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO = + new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT), + "AUTO", + "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker"); + + public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 = + new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1), + "v3.1.1"); + + public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 = + new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1), + "v3.1.0"); +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b5a30e9907..3dc2efae6e 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,5 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.mqtt.GetMQTT -org.apache.nifi.processors.mqtt.PutMQTT +org.apache.nifi.processors.mqtt.ConsumeMQTT +org.apache.nifi.processors.mqtt.PublishMQTT diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java new file mode 100644 index 0000000000..58c37e505a --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import io.moquette.proto.messages.PublishMessage; +import org.apache.nifi.processors.mqtt.common.MqttTestClient; +import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; +import org.apache.nifi.util.TestRunners; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + + +public class TestConsumeMQTT extends TestConsumeMqttCommon { + + + public MqttTestClient mqttTestClient; + + public class UnitTestableConsumeMqtt extends ConsumeMQTT { + + public UnitTestableConsumeMqtt(){ + super(); + } + + @Override + public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber); + return mqttTestClient; + } + } + + @Before + public void init() throws IOException { + PUBLISH_WAIT_MS = 0; + + broker = "tcp://localhost:1883"; + UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt(); + testRunner = TestRunners.newTestRunner(proc); + testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker); + testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Override + public void internalPublish(PublishMessage publishMessage) { + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setPayload(publishMessage.getPayload().array()); + mqttMessage.setRetained(publishMessage.isRetainFlag()); + mqttMessage.setQos(publishMessage.getQos().ordinal()); + + try { + mqttTestClient.publish(publishMessage.getTopicName(), mqttMessage); + } catch (MqttException e) { + Assert.fail("Should never get an MqttException when publishing using test client"); + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java deleted file mode 100644 index cd22735636..0000000000 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestGetMQTT.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.mqtt; - -import org.apache.nifi.processors.mqtt.GetMQTT; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - - -public class TestGetMQTT { - - private TestRunner testRunner; - - @Before - public void init() { - testRunner = TestRunners.newTestRunner(GetMQTT.class); - } - - @Test - public void testProcessor() { - - } - -} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java new file mode 100644 index 0000000000..cdbc67ff25 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt; + +import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; +import org.apache.nifi.processors.mqtt.common.MqttTestClient; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.util.TestRunners; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; + + +public class TestPublishMQTT extends TestPublishMqttCommon { + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage; + assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload())); + assertEquals(qos, mqttQueueMessage.getQos()); + assertEquals(retain, mqttQueueMessage.isRetained()); + assertEquals(topic, mqttQueueMessage.getTopic()); + } + + + public MqttTestClient mqttTestClient; + + public class UnitTestablePublishMqtt extends PublishMQTT { + + public UnitTestablePublishMqtt(){ + super(); + } + + @Override + public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException { + mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher); + return mqttTestClient; + } + } + + @Before + public void init() throws IOException { + UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt(); + testRunner = TestRunners.newTestRunner(proc); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + topic = "testTopic"; + testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic); + } + + @After + public void tearDown() throws Exception { + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java deleted file mode 100644 index 64a0548975..0000000000 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPutMQTT.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.processors.mqtt; - -import org.apache.nifi.processors.mqtt.PutMQTT; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; - - -public class TestPutMQTT { - - private TestRunner testRunner; - - @Before - public void init() { - testRunner = TestRunners.newTestRunner(PutMQTT.class); - } - - @Test - public void testProcessor() { - - } - -} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java new file mode 100644 index 0000000000..81e2b18aa1 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.MqttSecurityException; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class MqttTestClient implements IMqttClient { + + public String serverURI; + public String clientId; + + public AtomicBoolean connected = new AtomicBoolean(false); + + public MqttCallback mqttCallback; + public ConnectType type; + public enum ConnectType {Publisher, Subscriber} + + public MQTTQueueMessage publishedMessage; + + public String subscribedTopic; + public int subscribedQos; + + + public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException { + this.serverURI = serverURI; + this.clientId = clientId; + this.type = type; + } + + @Override + public void connect() throws MqttSecurityException, MqttException { + connected.set(true); + } + + @Override + public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException { + connected.set(true); + } + + @Override + public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException { + return null; + } + + @Override + public void disconnect() throws MqttException { + connected.set(false); + } + + @Override + public void disconnect(long quiesceTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly() throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly(long disconnectTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException { + connected.set(false); + } + + @Override + public void subscribe(String topicFilter) throws MqttException, MqttSecurityException { + subscribedTopic = topicFilter; + subscribedQos = -1; + } + + @Override + public void subscribe(String[] topicFilters) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void subscribe(String topicFilter, int qos) throws MqttException { + subscribedTopic = topicFilter; + subscribedQos = qos; + } + + @Override + public void subscribe(String[] topicFilters, int[] qos) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void unsubscribe(String topicFilter) throws MqttException { + subscribedTopic = ""; + subscribedQos = -2; + } + + @Override + public void unsubscribe(String[] topicFilters) throws MqttException { + throw new UnsupportedOperationException("Multiple topic filters is not supported"); + } + + @Override + public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + switch (type) { + case Publisher: + publishedMessage = new MQTTQueueMessage(topic, message); + break; + case Subscriber: + try { + mqttCallback.messageArrived(topic, message); + } catch (Exception e) { + throw new MqttException(e); + } + break; + } + } + + @Override + public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { + switch (type) { + case Publisher: + publishedMessage = new MQTTQueueMessage(topic, message); + break; + case Subscriber: + try { + mqttCallback.messageArrived(topic, message); + } catch (Exception e) { + throw new MqttException(e); + } + break; + } + } + + @Override + public void setCallback(MqttCallback callback) { + this.mqttCallback = callback; + } + + @Override + public MqttTopic getTopic(String topic) { + return null; + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public String getClientId() { + return clientId; + } + + @Override + public String getServerURI() { + return serverURI; + } + + @Override + public IMqttDeliveryToken[] getPendingDeliveryTokens() { + return new IMqttDeliveryToken[0]; + } + + @Override + public void close() throws MqttException { + + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java new file mode 100644 index 0000000000..5373a9f61f --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import org.apache.nifi.ssl.StandardSSLContextService; + +import java.util.HashMap; +import java.util.Map; + +public class MqttTestUtils { + public static Map createSslProperties() { + + final Map map = new HashMap<>(); + map.put(StandardSSLContextService.KEYSTORE.getName(), "src/test/resources/localhost-ks.jks"); + map.put(StandardSSLContextService.KEYSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.KEYSTORE_TYPE.getName(), "JKS"); + map.put(StandardSSLContextService.TRUSTSTORE.getName(), "src/test/resources/localhost-ts.jks"); + map.put(StandardSSLContextService.TRUSTSTORE_PASSWORD.getName(), "localtest"); + map.put(StandardSSLContextService.TRUSTSTORE_TYPE.getName(), "JKS"); + return map; + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java new file mode 100644 index 0000000000..c3f1b3d775 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.eclipse.paho.client.mqttv3.IMqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; +import static org.junit.Assert.assertTrue; + +public abstract class TestConsumeMqttCommon { + + public int PUBLISH_WAIT_MS = 1000; + + public Server MQTT_server; + public TestRunner testRunner; + public String broker; + + public abstract void internalPublish(PublishMessage publishMessage); + + @Test + public void testLastWillConfig() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1"); + testRunner.assertNotValid(); + testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false"); + testRunner.assertValid(); + } + + + @Test + public void testQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS2NotCleanSession() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + + @Test + public void testQoS1() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() > 0); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS1NotCleanSession() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1"); + testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.LEAST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() > 0); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testQoS0() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0"); + + testRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.MOST_ONE); + testMessage.setRetainFlag(false); + + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + assertTrue(flowFiles.size() < 2); + + if(flowFiles.size() == 1) { + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + } + + @Test + public void testOnStoppedFinish() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + MqttMessage innerMessage = new MqttMessage(); + innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array()); + innerMessage.setQos(2); + MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory(); + + Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue"); + f.setAccessible(true); + LinkedBlockingQueue queue = (LinkedBlockingQueue) f.get(consumeMQTT); + queue.add(testMessage); + + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + consumeMQTT.onStopped(testRunner.getProcessContext()); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + @Test + public void testResizeBuffer() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(false); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + assertTrue(isConnected(consumeMQTT)); + + internalPublish(testMessage); + internalPublish(testMessage); + + Thread.sleep(PUBLISH_WAIT_MS); + consumeMQTT.onUnscheduled(testRunner.getProcessContext()); + + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1"); + testRunner.assertNotValid(); + + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3"); + testRunner.assertValid(); + + testRunner.run(1); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } + + private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { + Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); + f.setAccessible(true); + IMqttClient mqttClient = (IMqttClient) f.get(processor); + return mqttClient.isConnected(); + } + + + public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException { + Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect"); + method.setAccessible(true); + method.invoke(processor); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java new file mode 100644 index 0000000000..75df6f3cb6 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.common; + +import io.moquette.server.Server; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.util.TestRunner; +import org.junit.Test; + +import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS; +import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; + +public abstract class TestPublishMqttCommon { + + public Server MQTT_server; + public TestRunner testRunner; + public String topic; + + public abstract void verifyPublishedMessage(byte[] payload, int qos, boolean retain); + + @Test + public void testQoS0() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "0"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + + verifyPublishedMessage(testMessage.getBytes(), 0, false); + } + + @Test + public void testQoS1() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "1"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 1, false); + } + + @Test + public void testQoS2NotCleanSession() { + // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, false); + } + + @Test + public void testQoS2() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, false); + } + + @Test + public void testRetainQoS2() { + testRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true"); + + testRunner.assertValid(); + + String testMessage = "testMessage"; + testRunner.enqueue(testMessage.getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + verifyPublishedMessage(testMessage.getBytes(), 2, true); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java new file mode 100644 index 0000000000..759bf96101 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; + + +public class TestConsumeMQTT extends TestConsumeMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + + broker = "tcp://localhost:1883"; + testRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker); + testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testRetainedQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(true); + + internalPublish(testMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true"); + } + + @Override + public void internalPublish(PublishMessage publishMessage) { + MQTT_server.internalPublish(publishMessage); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java new file mode 100644 index 0000000000..693c2a9cb3 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.proto.messages.AbstractMessage; +import io.moquette.proto.messages.PublishMessage; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties; + + +public class TestConsumeMqttSSL extends TestConsumeMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883"); + configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks"); + configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException, InitializationException { + startServer(); + + broker = "ssl://localhost:8883"; + testRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker); + testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + + final StandardSSLContextService sslService = new StandardSSLContextService(); + Map sslProperties = createSslProperties(); + testRunner.addControllerService("ssl-context", sslService, sslProperties); + testRunner.enableControllerService(sslService); + testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testRetainedQoS2() throws Exception { + testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + + testRunner.assertValid(); + + PublishMessage testMessage = new PublishMessage(); + testMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes())); + testMessage.setTopicName("testTopic"); + testMessage.setDupFlag(false); + testMessage.setQos(AbstractMessage.QOSType.EXACTLY_ONCE); + testMessage.setRetainFlag(true); + + internalPublish(testMessage); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor(); + consumeMQTT.onScheduled(testRunner.getProcessContext()); + reconnect(consumeMQTT); + + Thread.sleep(PUBLISH_WAIT_MS); + + testRunner.run(1, false, false); + + testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "true"); + } + + @Override + public void internalPublish(PublishMessage publishMessage) { + MQTT_server.internalPublish(publishMessage); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java new file mode 100644 index 0000000000..a97ac98386 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.ConsumeMQTT; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; +import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS; +import static org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon.reconnect; + +public class TestPublishAndSubscribeMqttIntegration { + private TestRunner testSubscribeRunner; + private TestRunner testPublishRunner; + private Server MQTT_server; + + private static int PUBLISH_WAIT_MS = 1000; + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + testSubscribeRunner = TestRunners.newTestRunner(ConsumeMQTT.class); + testPublishRunner = TestRunners.newTestRunner(PublishMQTT.class); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Test + public void testBasic() throws Exception { + subscribe(); + publishAndVerify(); + Thread.sleep(PUBLISH_WAIT_MS); + testSubscribeRunner.run(); + subscribeVerify(); + } + + private void publishAndVerify(){ + testPublishRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testPublishRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestPublishClient"); + testPublishRunner.setProperty(PublishMQTT.PROP_QOS, "2"); + testPublishRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + testPublishRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + + testPublishRunner.assertValid(); + + String testMessage = "testMessage"; + testPublishRunner.enqueue(testMessage.getBytes()); + + testPublishRunner.run(); + + testPublishRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testPublishRunner.assertTransferCount(REL_SUCCESS, 1); + } + + private void subscribe() throws IOException, ClassNotFoundException, MqttException, InvocationTargetException, NoSuchMethodException, IllegalAccessException, NoSuchFieldException { + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestSubscribeClient"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_QOS, "2"); + testSubscribeRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100"); + + testSubscribeRunner.assertValid(); + + ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor(); + consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext()); + reconnect(consumeMQTT); + } + + private void subscribeVerify(){ + testSubscribeRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1); + + List flowFiles = testSubscribeRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE); + MockFlowFile flowFile = flowFiles.get(0); + + flowFile.assertContentEquals("testMessage"); + flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, "tcp://localhost:1883"); + flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic"); + flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2"); + flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false"); + flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java new file mode 100644 index 0000000000..5777825617 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMQTT.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; + + +public class TestPublishMQTT extends TestPublishMqttCommon { + + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException { + startServer(); + testRunner = TestRunners.newTestRunner(PublishMQTT.class); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); + testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles( new FilenameFilter() { + @Override + public boolean accept( final File dir, + final String name ) { + return name.matches( "moquette_store.mapdb.*" ); + } + } ); + for ( final File file : files ) { + if ( !file.delete() ) { + System.err.println( "Can't remove " + file.getAbsolutePath() ); + } + } + } + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + //Cannot verify published message without subscribing and consuming it which is outside the scope of this test. + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java new file mode 100644 index 0000000000..6270d7a8b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishMqttSSL.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.mqtt.integration; + +import io.moquette.BrokerConstants; +import io.moquette.server.Server; +import io.moquette.server.config.IConfig; +import io.moquette.server.config.MemoryConfig; +import org.apache.nifi.processors.mqtt.PublishMQTT; +import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import static io.moquette.BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME; +import static org.apache.nifi.processors.mqtt.common.MqttTestUtils.createSslProperties; + + +public class TestPublishMqttSSL extends TestPublishMqttCommon { + + private void startServer() throws IOException { + MQTT_server = new Server(); + final Properties configProps = new Properties(); + + configProps.put(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, "1884"); + configProps.put(BrokerConstants.SSL_PORT_PROPERTY_NAME, "8883"); + configProps.put(BrokerConstants.JKS_PATH_PROPERTY_NAME, "src/test/resources/localhost-ks.jks"); + configProps.put(BrokerConstants.KEY_STORE_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.put(BrokerConstants.KEY_MANAGER_PASSWORD_PROPERTY_NAME, "localtest"); + configProps.setProperty(PERSISTENT_STORE_PROPERTY_NAME,"./target/moquette_store.mapdb"); + IConfig server_config = new MemoryConfig(configProps); + MQTT_server.startServer(server_config); + } + + @Before + public void init() throws IOException, InitializationException { + startServer(); + testRunner = TestRunners.newTestRunner(PublishMQTT.class); + testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "ssl://localhost:8883"); + testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); + testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true"); + testRunner.setProperty(PublishMQTT.PROP_TOPIC, "testTopic"); + + final StandardSSLContextService sslService = new StandardSSLContextService(); + Map sslProperties = createSslProperties(); + testRunner.addControllerService("ssl-context", sslService, sslProperties); + testRunner.enableControllerService(sslService); + testRunner.setProperty(PublishMQTT.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + } + + @After + public void tearDown() throws Exception { + if (MQTT_server != null) { + MQTT_server.stopServer(); + } + final File folder = new File("./target"); + final File[] files = folder.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, + final String name) { + return name.matches("moquette_store.mapdb.*"); + } + }); + for (final File file : files) { + if (!file.delete()) { + System.err.println("Can't remove " + file.getAbsolutePath()); + } + } + } + + @Override + public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) { + //Cannot verify published message without subscribing and consuming it which is outside the scope of this test. + } +} diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks new file mode 100755 index 0000000000..df36197d92 Binary files /dev/null and b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ks.jks differ diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks new file mode 100755 index 0000000000..7824378a32 Binary files /dev/null and b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/resources/localhost-ts.jks differ diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml index 456fae366a..5ce1ec1f28 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-mqtt-bundle/pom.xml @@ -33,5 +33,5 @@ 1.0.0-SNAPSHOT - +