From 23b04ae96863fc3b0fd3b268bcb7d155edf17f26 Mon Sep 17 00:00:00 2001 From: Justin Miller Date: Fri, 8 Nov 2019 11:34:57 -0600 Subject: [PATCH] NIFI-6856 - Make client ID a non-required field for the MQTTConsume and MQTTProduce processors. Generates a random ID if not set. Also add group ID field to ConsumeMQTT processor. Allows consumer to join consumer group at $share// add expression language support for the MQTT client ID Setting client id in publish test fails because it is not a flowfile attribute. Remove client id and autogenerate it when testing. Since the evaluation is done in onScheduled, there is no flow file available and we're not using the attributes to make the expression language evaluation. You can change the scope to use the Variable Registry. Co-Authored-By: Pierre Villard Signed-off-by: Pierre Villard This closes #3879. --- .../nifi/processors/mqtt/ConsumeMQTT.java | 22 ++++++++++++++++++- .../mqtt/common/AbstractMQTTProcessor.java | 13 ++++++++--- .../nifi/processors/mqtt/TestPublishMQTT.java | 1 - 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index f0cba7298b..94d53974dc 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -90,6 +90,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate"; public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained"; + public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder() + .name("Group ID") + .description("MQTT consumer group ID to use. If group ID not set, client will connect as individual consumer.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PROP_TOPIC_FILTER = new PropertyDescriptor.Builder() .name("Topic Filter") .description("The MQTT topic filter to designate the topics to subscribe to.") @@ -121,6 +128,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback private volatile long maxQueueSize; private volatile int qos; + private volatile String topicPrefix = ""; private volatile String topicFilter; private final AtomicBoolean scheduled = new AtomicBoolean(false); @@ -136,6 +144,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback static{ final List innerDescriptorsList = getAbstractPropertyDescriptors(); + innerDescriptorsList.add(PROP_GROUPID); innerDescriptorsList.add(PROP_TOPIC_FILTER); innerDescriptorsList.add(PROP_QOS); innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE); @@ -184,6 +193,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback .build()); } + final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet(); + final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet(); + if (clientIDSet && groupIDSet) { + results.add(new ValidationResult.Builder().subject("Client ID and Group ID").valid(false).explanation("if client ID is not unique, multiple nodes cannot join the consumer group").build()); + } + return results; } @@ -208,6 +223,11 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback qos = context.getProperty(PROP_QOS).asInteger(); maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong(); topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue(); + + if (context.getProperty(PROP_GROUPID).isSet()) { + topicPrefix = "$share/" + context.getProperty(PROP_GROUPID).getValue() + "/"; + } + scheduled.set(true); } @@ -266,7 +286,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback if (!mqttClient.isConnected()) { logger.debug("Connecting client"); mqttClient.connect(connOpts); - mqttClient.subscribe(topicFilter, qos); + mqttClient.subscribe(topicPrefix + topicFilter, qos); } } catch (MqttException e) { logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java index a1e65f33e7..34c3e1fc71 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java @@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; +import java.util.UUID; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE; @@ -122,8 +124,9 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder() .name("Client ID") - .description("MQTT client ID to use") - .required(true) + .description("MQTT client ID to use. If not set, a UUID will be generated.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); @@ -297,7 +300,11 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces protected void onScheduled(final ProcessContext context){ broker = context.getProperty(PROP_BROKER_URI).getValue(); - clientID = context.getProperty(PROP_CLIENTID).getValue(); + clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); + + if (clientID == null) { + clientID = UUID.randomUUID().toString(); + } connOpts = new MqttConnectOptions(); connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean()); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java index 9916408b30..9c886d2289 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java @@ -67,7 +67,6 @@ public class TestPublishMQTT extends TestPublishMqttCommon { UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt(); testRunner = TestRunners.newTestRunner(proc); testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883"); - testRunner.setProperty(PublishMQTT.PROP_CLIENTID, "TestClient"); testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false"); topic = "testTopic"; testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);