From c9d778a8eedad64b243d28fa481fedbe21222e6e Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 6 Oct 2020 23:06:18 +0200 Subject: [PATCH] NIFI-7889 - ConsumeMQTT - use offer instead of add This closes #4578. Signed-off-by: Peter Turcsanyi --- .../nifi/processors/mqtt/ConsumeMQTT.java | 20 ++++++------ .../additionalDetails.html | 32 +++++++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) create mode 100644 nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/docs/org.apache.nifi.processors.mqtt.ConsumeMQTT/additionalDetails.html diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index 4fe2a9cf74..ded89fece5 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -19,6 +19,8 @@ package org.apache.nifi.processors.mqtt; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SystemResource; +import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; @@ -82,6 +85,9 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " + "on the topic.")}) +@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single " + + "instance of this processor. A high value for this property could represent a lot of data being stored in memory.") + public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; @@ -118,15 +124,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder() .name("Max Queue Size") - .description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this " + - "processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.") + .description("The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is " + + "significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property " + + "specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); - - private volatile long maxQueueSize; - private volatile int qos; private volatile String topicPrefix = ""; private volatile String topicFilter; @@ -217,11 +221,11 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback return descriptors; } + @Override @OnScheduled public void onScheduled(final ProcessContext context) { super.onScheduled(context); qos = context.getProperty(PROP_QOS).asInteger(); - maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong(); topicFilter = context.getProperty(PROP_TOPIC_FILTER).evaluateAttributeExpressions().getValue(); if (context.getProperty(PROP_GROUPID).isSet()) { @@ -347,10 +351,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback } } - if (mqttQueue.size() >= maxQueueSize){ + if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) { throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); - } else { - mqttQueue.add(new MQTTQueueMessage(topic, message)); } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/docs/org.apache.nifi.processors.mqtt.ConsumeMQTT/additionalDetails.html b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/docs/org.apache.nifi.processors.mqtt.ConsumeMQTT/additionalDetails.html new file mode 100644 index 0000000000..7d3684908b --- /dev/null +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/resources/docs/org.apache.nifi.processors.mqtt.ConsumeMQTT/additionalDetails.html @@ -0,0 +1,32 @@ + + + + + + ConsumeMQTT + + + + +

The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. + If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur + in the internal queue of this processor. Each time the processor is scheduled, the messages in the internal queue will be written to + FlowFiles. In case the internal queue is full, the MQTT client will try for up to 1 second to add the message into the internal queue. + If the internal queue is still full after this time, an exception saying that 'The subscriber queue is full' would be thrown, the + message would be dropped and the client would be disconnected. In case the QoS property is set to 0, the message would be lost. In + case the QoS property is set to 1 or 2, the message will be received after the client reconnects.

+ + \ No newline at end of file