NIFI-7889 - ConsumeMQTT - use offer instead of add

This closes #4578.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Pierre Villard 2020-10-06 23:06:18 +02:00 committed by Peter Turcsanyi
parent 1e227ca643
commit c9d778a8ee
2 changed files with 43 additions and 9 deletions

View File

@ -19,6 +19,8 @@ package org.apache.nifi.processors.mqtt;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -58,6 +60,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
@ -82,6 +85,9 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."), @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."),
@WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " + @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " +
"on the topic.")}) "on the topic.")})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single "
+ "instance of this processor. A high value for this property could represent a lot of data being stored in memory.")
public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
@ -118,15 +124,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_MAX_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Queue Size") .name("Max Queue Size")
.description("The MQTT messages are always being sent to subscribers on a topic. If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this " + .description("The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run. If the 'Run Schedule' is "
"processor then a back up can occur. This property specifies the maximum number of messages this processor will hold in memory at one time.") + "significantly behind the rate at which the messages are arriving to this processor, then a back up can occur in the internal queue of this processor. This property "
+ "specifies the maximum number of messages this processor will hold in memory at one time in the internal queue. This data would be lost in case of a NiFi restart.")
.required(true) .required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
private volatile long maxQueueSize;
private volatile int qos; private volatile int qos;
private volatile String topicPrefix = ""; private volatile String topicPrefix = "";
private volatile String topicFilter; private volatile String topicFilter;
@ -217,11 +221,11 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
return descriptors; return descriptors;
} }
@Override
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) { public void onScheduled(final ProcessContext context) {
super.onScheduled(context); super.onScheduled(context);
qos = context.getProperty(PROP_QOS).asInteger(); qos = context.getProperty(PROP_QOS).asInteger();
maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
topicFilter = context.getProperty(PROP_TOPIC_FILTER).evaluateAttributeExpressions().getValue(); topicFilter = context.getProperty(PROP_TOPIC_FILTER).evaluateAttributeExpressions().getValue();
if (context.getProperty(PROP_GROUPID).isSet()) { if (context.getProperty(PROP_GROUPID).isSet()) {
@ -347,10 +351,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
} }
} }
if (mqttQueue.size() >= maxQueueSize){ if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) {
throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run."); throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
} else {
mqttQueue.add(new MQTTQueueMessage(topic, message));
} }
} }

View File

@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ConsumeMQTT</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>The MQTT messages are always being sent to subscribers on a topic regardless of how frequently the processor is scheduled to run.
If the 'Run Schedule' is significantly behind the rate at which the messages are arriving to this processor, then a back up can occur
in the internal queue of this processor. Each time the processor is scheduled, the messages in the internal queue will be written to
FlowFiles. In case the internal queue is full, the MQTT client will try for up to 1 second to add the message into the internal queue.
If the internal queue is still full after this time, an exception saying that 'The subscriber queue is full' would be thrown, the
message would be dropped and the client would be disconnected. In case the QoS property is set to 0, the message would be lost. In
case the QoS property is set to 1 or 2, the message will be received after the client reconnects.</p>
</body>
</html>