From 2b9f2071ed111f461236900277976f91bc1ac029 Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Sun, 12 Mar 2023 22:58:55 +0100 Subject: [PATCH] NIFI-11270 Refactoring of the overly Paho-specific MQTT interface This closes #7032. Signed-off-by: Peter Turcsanyi --- .../nifi/processors/mqtt/ConsumeMQTT.java | 21 +---- .../nifi/processors/mqtt/PublishMQTT.java | 23 +---- .../mqtt/adapters/HiveMqV5ClientAdapter.java | 15 +--- .../mqtt/adapters/PahoMqttClientAdapter.java | 89 +++++++++++++------ .../processors/mqtt/common/MqttClient.java | 10 +-- ...k.java => ReceivedMqttMessageHandler.java} | 12 ++- .../mqtt/common/MqttTestClient.java | 13 +-- 7 files changed, 84 insertions(+), 99 deletions(-) rename nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/{MqttCallback.java => ReceivedMqttMessageHandler.java} (80%) diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java index f0d529a233..02982600f7 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java @@ -43,7 +43,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; -import org.apache.nifi.processors.mqtt.common.MqttCallback; import org.apache.nifi.processors.mqtt.common.MqttException; import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; import org.apache.nifi.serialization.MalformedRecordException; @@ -104,7 +103,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL "on the topic.")}) @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single " + "instance of this processor. A high value for this property could represent a lot of data being stored in memory.") -public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { +public class ConsumeMQTT extends AbstractMQTTProcessor { public final static String RECORD_COUNT_KEY = "record.count"; public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; @@ -383,9 +382,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { // non-null but not connected, so we need to handle each case and only create a new client when it is null try { mqttClient = createMqttClient(); - mqttClient.setCallback(this); mqttClient.connect(); - mqttClient.subscribe(topicPrefix + topicFilter, qos); + mqttClient.subscribe(topicPrefix + topicFilter, qos, this::handleReceivedMessage); } catch (Exception e) { logger.error("Connection failed to {}. Yielding processor", clientProperties.getRawBrokerUris(), e); mqttClient = null; // prevent stucked processor when subscribe fails @@ -614,13 +612,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { return stringBuilder.toString(); } - @Override - public void connectionLost(Throwable cause) { - logger.error("Connection to {} lost", clientProperties.getRawBrokerUris(), cause); - } - - @Override - public void messageArrived(ReceivedMqttMessage message) { + private void handleReceivedMessage(ReceivedMqttMessage message) { if (logger.isDebugEnabled()) { byte[] payload = message.getPayload(); final String text = new String(payload, StandardCharsets.UTF_8); @@ -639,11 +631,4 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { throw new MqttException("Failed to process message arrived from topic " + message.getTopic()); } } - - @Override - public void deliveryComplete(String token) { - // Unlikely situation. Api uses the same callback for publisher and consumer as well. - // That's why we have this log message here to indicate something really messy thing happened. - logger.error("Received MQTT 'delivery complete' message to subscriber. Token: [{}]", token); - } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java index ffb9633549..13b18abb86 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java @@ -38,8 +38,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; -import org.apache.nifi.processors.mqtt.common.MqttCallback; -import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -74,7 +72,7 @@ import static java.util.Optional.ofNullable; @CapabilityDescription("Publishes a message to an MQTT topic") @SeeAlso({ConsumeMQTT.class}) @SystemResourceConsideration(resource = SystemResource.MEMORY) -public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { +public class PublishMQTT extends AbstractMQTTProcessor { public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder() .name("Topic") @@ -289,7 +287,6 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { // non-null but not connected, so we need to handle each case and only create a new client when it is null try { mqttClient = createMqttClient(); - mqttClient.setCallback(this); mqttClient.connect(); } catch (Exception e) { logger.error("Connection failed to {}. Yielding processor", clientProperties.getRawBrokerUris(), e); @@ -297,24 +294,6 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback { } } - @Override - public void connectionLost(Throwable cause) { - logger.error("Connection to {} lost", clientProperties.getRawBrokerUris(), cause); - } - - @Override - public void messageArrived(ReceivedMqttMessage message) { - // Unlikely situation. Api uses the same callback for publisher and consumer as well. - // That's why we have this log message here to indicate something really messy thing happened. - logger.error("Message arrived to a PublishMQTT processor { topic:'" + message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + "}"); - } - - @Override - public void deliveryComplete(String token) { - // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application. - logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token); - } - interface ProcessStrategy { void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException; String getFailureTemplateMessage(); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java index b44c18070a..4295f07502 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java @@ -24,12 +24,12 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect; import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.mqtt.common.MqttCallback; import org.apache.nifi.processors.mqtt.common.MqttClient; import org.apache.nifi.processors.mqtt.common.MqttClientProperties; import org.apache.nifi.processors.mqtt.common.MqttException; import org.apache.nifi.processors.mqtt.common.MqttProtocolScheme; import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessageHandler; import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.security.util.KeyStoreUtils; import org.apache.nifi.security.util.TlsException; @@ -50,8 +50,6 @@ public class HiveMqV5ClientAdapter implements MqttClient { private final MqttClientProperties clientProperties; private final ComponentLog logger; - private MqttCallback callback; - public HiveMqV5ClientAdapter(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) throws TlsException { this.mqtt5BlockingClient = createClient(brokerUri, clientProperties, logger); this.clientProperties = clientProperties; @@ -124,9 +122,7 @@ public class HiveMqV5ClientAdapter implements MqttClient { } @Override - public void subscribe(String topicFilter, int qos) { - Objects.requireNonNull(callback, "callback should be set"); - + public void subscribe(String topicFilter, int qos, ReceivedMqttMessageHandler handler) { logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos); CompletableFuture futureAck = mqtt5BlockingClient.toAsync().subscribeWith() @@ -138,7 +134,7 @@ public class HiveMqV5ClientAdapter implements MqttClient { mqtt5Publish.getQos().getCode(), mqtt5Publish.isRetain(), mqtt5Publish.getTopic().toString()); - callback.messageArrived(receivedMessage); + handler.handleReceivedMessage(receivedMessage); }) .send(); @@ -152,11 +148,6 @@ public class HiveMqV5ClientAdapter implements MqttClient { } } - @Override - public void setCallback(MqttCallback callback) { - this.callback = callback; - } - private static Mqtt5BlockingClient createClient(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) throws TlsException { logger.debug("Creating Mqtt v5 client"); diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java index 15b49fc208..fd723601a7 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java @@ -17,20 +17,22 @@ package org.apache.nifi.processors.mqtt.adapters; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.mqtt.common.MqttCallback; -import org.apache.nifi.processors.mqtt.common.MqttClient; import org.apache.nifi.processors.mqtt.common.MqttClientProperties; +import org.apache.nifi.processors.mqtt.common.MqttClient; import org.apache.nifi.processors.mqtt.common.MqttException; import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage; +import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessageHandler; import org.apache.nifi.processors.mqtt.common.StandardMqttMessage; import org.apache.nifi.security.util.TlsConfiguration; import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.net.URI; +import java.util.Arrays; import java.util.Properties; public class PahoMqttClientAdapter implements MqttClient { @@ -45,6 +47,7 @@ public class PahoMqttClientAdapter implements MqttClient { this.client = createClient(brokerUri, clientProperties, logger); this.clientProperties = clientProperties; this.logger = logger; + client.setCallback(new DefaultMqttCallback()); } @Override @@ -121,9 +124,11 @@ public class PahoMqttClientAdapter implements MqttClient { } @Override - public void subscribe(String topicFilter, int qos) { + public void subscribe(String topicFilter, int qos, ReceivedMqttMessageHandler handler) { logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos); + client.setCallback(new ConsumerMqttCallback(handler)); + try { client.subscribe(topicFilter, qos); } catch (org.eclipse.paho.client.mqttv3.MqttException e) { @@ -131,28 +136,6 @@ public class PahoMqttClientAdapter implements MqttClient { } } - @Override - public void setCallback(MqttCallback callback) { - client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() { - @Override - public void connectionLost(Throwable cause) { - callback.connectionLost(cause); - } - - @Override - public void messageArrived(String topic, MqttMessage message) { - logger.debug("Message arrived with id: {}", message.getId()); - final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic); - callback.messageArrived(receivedMessage); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - callback.deliveryComplete(token.toString()); - } - }); - } - public static Properties transformSSLContextService(TlsConfiguration tlsConfiguration) { final Properties properties = new Properties(); if (tlsConfiguration.getProtocol() != null) { @@ -176,7 +159,7 @@ public class PahoMqttClientAdapter implements MqttClient { if (tlsConfiguration.getTruststoreType() != null) { properties.setProperty("com.ibm.ssl.trustStoreType", tlsConfiguration.getTruststoreType().getType()); } - return properties; + return properties; } private static org.eclipse.paho.client.mqttv3.MqttClient createClient(URI brokerUri, MqttClientProperties clientProperties, ComponentLog logger) { @@ -189,4 +172,58 @@ public class PahoMqttClientAdapter implements MqttClient { } } + /** + * Paho API uses the same callback for the publisher and consumer as well. + * Because of that, DefaultMqttCallback sets some reasonable default logs + * to make it easier to track misconfiguration errors. + *

+ * In case of subscribing clients messageArrived needs to be overridden. + */ + private class DefaultMqttCallback implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + logger.error("Connection to [{}] lost", clientProperties.getRawBrokerUris(), cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + // Unlikely situation. The Paho api uses the same callback for publisher and consumer as well. That's why + // we have this log message here to indicate something messy thing happened because we don't expect to + // receive messages until the client is not subscribed and the callback is not changed to ConsumerMqttCallback. + logger.error("MQTT message arrived [topic:{}; payload:{}]", topic, Arrays.toString(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token); + } + } + + /** + * Subscriber specific implementation of MqttCallback + */ + private class ConsumerMqttCallback extends DefaultMqttCallback { + + private final ReceivedMqttMessageHandler handler; + + private ConsumerMqttCallback(ReceivedMqttMessageHandler handler) { + this.handler = handler; + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + logger.debug("Message arrived. Id: [{}]", message.getId()); + final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic); + handler.handleReceivedMessage(receivedMessage); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Unlikely situation. The Paho api uses the same callback for publisher and consumer as well. That's why + // we have this log message here to indicate something messy thing happened because we don't expect to + // receive 'delivery complete' messages while the client is subscribed. + logger.error("Received MQTT 'delivery complete' message to a subscribed client. Token: [{}]", token); + } + } } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java index f21d3e9242..2b5c949531 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java @@ -57,13 +57,7 @@ public interface MqttClient { * published at a lower quality of service will be received at the published * QoS. Messages published at a higher quality of service will be received using * the QoS specified on the subscribe. + * @param handler that further processes the message received by the client */ - void subscribe(String topicFilter, int qos); - - /** - * Sets a callback listener to use for events that happen asynchronously. - * - * @param callback for matching events - */ - void setCallback(MqttCallback callback); + void subscribe(String topicFilter, int qos, ReceivedMqttMessageHandler handler); } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java similarity index 80% rename from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java rename to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java index a890616f5c..6a5c75b119 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessageHandler.java @@ -16,8 +16,12 @@ */ package org.apache.nifi.processors.mqtt.common; -public interface MqttCallback { - void connectionLost(Throwable cause); - void messageArrived(ReceivedMqttMessage message); - void deliveryComplete(String token); +public interface ReceivedMqttMessageHandler { + + /** + * Handler to process received MQTT message + * + * @param message to process + */ + void handleReceivedMessage(ReceivedMqttMessage message); } diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java index dcb87b612c..263f3c55d3 100644 --- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java +++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java @@ -29,14 +29,13 @@ public class MqttTestClient implements MqttClient { public AtomicBoolean connected = new AtomicBoolean(false); - public MqttCallback mqttCallback; public ConnectType type; public enum ConnectType {Publisher, Subscriber} public String subscribedTopic; public int subscribedQos; - + public ReceivedMqttMessageHandler receivedMqttMessageHandler; public MqttTestClient(ConnectType type) { this.type = type; } @@ -68,20 +67,16 @@ public class MqttTestClient implements MqttClient { publishedMessages.add(Pair.of(topic, message)); break; case Subscriber: - mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic)); + receivedMqttMessageHandler.handleReceivedMessage(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic)); break; } } @Override - public void subscribe(String topicFilter, int qos) { + public void subscribe(String topicFilter, int qos, ReceivedMqttMessageHandler handler) { subscribedTopic = topicFilter; subscribedQos = qos; - } - - @Override - public void setCallback(MqttCallback callback) { - this.mqttCallback = callback; + receivedMqttMessageHandler = handler; } public Pair getLastPublished() {