diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 3e9bed91ef..6810b8837a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -93,5 +93,10 @@
nifi-schema-registry-service-api
test
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ test
+
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 46d1452cea..c9e318ac86 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -161,22 +161,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
- public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
- .name("record-reader")
- .displayName("Record Reader")
- .description("The Record Reader to use for received messages")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(false)
- .build();
-
- public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
- .name("record-writer")
- .displayName("Record Writer")
- .description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
- .identifiesControllerService(RecordSetWriterFactory.class)
- .required(false)
- .build();
-
public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
@@ -201,6 +185,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_READER)
+ .description("The Record Reader to use for parsing received MQTT Messages into Records.")
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_WRITER)
+ .description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
+ .build();
+
private volatile int qos;
private volatile String topicPrefix = "";
private volatile String topicFilter;
@@ -229,10 +223,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(PROP_TOPIC_FILTER);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
- innerDescriptorsList.add(RECORD_READER);
- innerDescriptorsList.add(RECORD_WRITER);
innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS);
innerDescriptorsList.add(MESSAGE_DEMARCATOR);
+ innerDescriptorsList.add(RECORD_READER);
+ innerDescriptorsList.add(RECORD_WRITER);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set innerRelationshipsSet = new HashSet<>();
@@ -292,16 +286,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
- final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
- if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
- results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
- .explanation("both Record Reader and Writer must be set when used.").build());
- }
-
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
- .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
+ .explanation("message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
}
return results;
@@ -461,7 +449,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
FlowFile messageFlowfile = session.create();
- Map attrs = new HashMap<>();
+ final Map attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
@@ -476,7 +464,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- FlowFile flowFile = session.create();
+ final FlowFile flowFile = session.create();
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
final Map attributes = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index 2ea80b1ded..86fa051f71 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -39,18 +39,30 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
-import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
+import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Optional.ofNullable;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -86,6 +98,16 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
.addValidator(RETAIN_VALIDATOR)
.build();
+ public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_READER)
+ .description("The Record Reader to use for parsing the incoming FlowFile into Records.")
+ .build();
+
+ public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(BASE_RECORD_WRITER)
+ .description("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
@@ -103,6 +125,8 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(PROP_TOPIC);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_RETAIN);
+ innerDescriptorsList.add(RECORD_READER);
+ innerDescriptorsList.add(RECORD_WRITER);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set innerRelationshipsSet = new HashSet<>();
@@ -111,9 +135,17 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
+ static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
+
+ static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = ".mqtt.publish.failed.index";
+ private String publishFailedIndexAttributeName;
+
@Override
protected void init(final ProcessorInitializationContext context) {
logger = getLogger();
+ publishFailedIndexAttributeName = getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
}
@Override
@@ -162,28 +194,105 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
return;
}
- // do the read
- final byte[] messageContent = new byte[(int) flowfile.getSize()];
- session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
+ if (context.getProperty(RECORD_READER).isSet()) {
+ processRecordSet(context, session, flowfile, topic);
+ } else {
+ processStandardFlowFile(context, session, flowfile, topic);
+ }
+ }
+ private void processRecordSet(ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
+ final StopWatch stopWatch = new StopWatch(true);
+ final AtomicInteger processedRecords = new AtomicInteger();
+
+ try {
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ final Long previousProcessFailedAt = ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
+
+ session.read(flowfile, in -> {
+ try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
+ final RecordSet recordSet = reader.createRecordSet();
+
+ final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
+ processedRecords.getAndIncrement();
+ continue;
+ }
+
+ baos.reset();
+
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
+ writer.write(record);
+ writer.flush();
+ }
+
+ final byte[] messageContent = baos.toByteArray();
+
+ publishMessage(context, flowfile, topic, messageContent);
+ processedRecords.getAndIncrement();
+ }
+ } catch (SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException("An error happened during creating components for serialization.", e);
+ }
+ });
+
+ FlowFile successFlowFile = flowfile;
+
+ String provenanceEventDetails;
+ if (previousProcessFailedAt != null) {
+ successFlowFile = session.removeAttribute(flowfile, publishFailedIndexAttributeName);
+ provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, processedRecords.get());
+ } else {
+ provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
+ }
+
+ session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(successFlowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ logger.error("An error happened during publishing records. Routing to failure.", e);
+
+ FlowFile failedFlowFile = session.putAttribute(flowfile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
+
+ if (processedRecords.get() > 0) {
+ session.getProvenanceReporter().send(
+ failedFlowFile,
+ clientProperties.getBroker(),
+ String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
+ stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ }
+
+ session.transfer(failedFlowFile, REL_FAILURE);
+ }
+ }
+
+ private void processStandardFlowFile(ProcessContext context, ProcessSession session, FlowFile flowfile, String topic) {
+ try {
+ final byte[] messageContent = new byte[(int) flowfile.getSize()];
+ session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
+
+ final StopWatch stopWatch = new StopWatch(true);
+ publishMessage(context, flowfile, topic, messageContent);
+ session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(flowfile, REL_SUCCESS);
+ } catch (Exception e) {
+ logger.error("An error happened during publishing a message. Routing to failure.", e);
+ session.transfer(flowfile, REL_FAILURE);
+ }
+ }
+
+ private void publishMessage(ProcessContext context, FlowFile flowfile, String topic, byte[] messageContent) {
int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained);
- try {
- final StopWatch stopWatch = new StopWatch(true);
- /*
- * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
- * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
- */
- mqttClient.publish(topic, mqttMessage);
-
- session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowfile, REL_SUCCESS);
- } catch (MqttException me) {
- logger.error("Failed to publish message.", me);
- session.transfer(flowfile, REL_FAILURE);
- }
+ mqttClient.publish(topic, mqttMessage);
}
private void initializeClient(ProcessContext context) {
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 8b8c186360..0fd799b8a5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -32,6 +32,8 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.TlsException;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.ssl.SSLContextService;
import java.net.URI;
@@ -42,6 +44,7 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
import static org.apache.commons.lang3.StringUtils.EMPTY;
@@ -93,7 +96,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
};
private static String getSupportedSchemeList() {
- return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
+ return Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).collect(Collectors.joining(", "));
}
public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> {
@@ -232,6 +235,20 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ public static final PropertyDescriptor BASE_RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor BASE_RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(false)
+ .build();
+
public static List getAbstractPropertyDescriptors() {
final List descriptors = new ArrayList<>();
descriptors.add(PROP_BROKER_URI);
@@ -287,6 +304,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it is not valid URI syntax.").build());
}
+ final boolean readerIsSet = validationContext.getProperty(BASE_RECORD_READER).isSet();
+ final boolean writerIsSet = validationContext.getProperty(BASE_RECORD_WRITER).isSet();
+ if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
+ results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
+ .explanation("both Record Reader and Writer must be set when used.").build());
+ }
+
return results;
}
@@ -366,9 +390,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
- final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
- if (sslProp.isSet()) {
- final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
+ final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 81ea6d7db1..3cb1c2fedf 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -17,105 +17,575 @@
package org.apache.nifi.processors.mqtt;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
-import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
+import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestConsumeMQTT extends TestConsumeMqttCommon {
- private static TlsConfiguration tlsConfiguration;
+public class TestConsumeMQTT {
- public MqttTestClient mqttTestClient;
+ private static final int PUBLISH_WAIT_MS = 0;
+ private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
+ private static final String BROKER_URI = "tcp://localhost:1883";
+ private static final String CLIENT_ID = "TestClient";
+ private static final String TOPIC_NAME = "testTopic";
+ private static final String INTERNAL_QUEUE_SIZE = "100";
- public class UnitTestableConsumeMqtt extends ConsumeMQTT {
+ private static final String STRING_MESSAGE = "testMessage";
+ private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
- public UnitTestableConsumeMqtt(){
- super();
- }
+ private static final int AT_MOST_ONCE = 0;
+ private static final int AT_LEAST_ONCE = 1;
+ private static final int EXACTLY_ONCE = 2;
- @Override
- protected MqttClient createMqttClient() {
- mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
- return mqttTestClient;
+ private MqttTestClient mqttTestClient;
+ private TestRunner testRunner;
+
+ @AfterEach
+ public void cleanup() {
+ testRunner = null;
+ mqttTestClient = null;
+ }
+
+ @Test
+ public void testClientIDConfiguration() {
+ testRunner = initializeTestRunner();
+ testRunner.assertValid();
+
+ testRunner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
+ testRunner.assertValid();
+
+ testRunner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void testLastWillConfig() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
+ testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
+ testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
+ testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
+ testRunner.assertValid();
+ }
+
+
+ @Test
+ public void testQoS2() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+ assertProvenanceEvents(1);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+ }
+
+ @Test
+ public void testQoS2NotCleanSession() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+ testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ testRunner.run(1, false, false);
+
+ testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+ assertProvenanceEvents(1);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+ }
+
+ @Test
+ public void testQoS1() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertTrue(flowFiles.size() > 0);
+ assertProvenanceEvents(flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+ }
+
+ @Test
+ public void testQoS1NotCleanSession() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
+ testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+ publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
+
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ testRunner.run(1, false, false);
+
+ testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertTrue(flowFiles.size() > 0);
+ assertProvenanceEvents(flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+ }
+
+ @Test
+ public void testQoS0() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(STRING_MESSAGE, AT_MOST_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertTrue(flowFiles.size() < 2);
+ assertProvenanceEvents(flowFiles.size());
+
+ if(flowFiles.size() == 1) {
+ MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}
- @BeforeAll
- public static void setTlsConfiguration() {
- tlsConfiguration = new TemporaryKeyStoreBuilder().build();
+ @Test
+ public void testOnStoppedFinish() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+
+ testRunner.assertValid();
+
+ final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
+ final ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, TOPIC_NAME);
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
+
+ final Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
+ f.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ final LinkedBlockingQueue queue = (LinkedBlockingQueue) f.get(consumeMQTT);
+ queue.add(testMessage);
+
+ consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+ consumeMQTT.onStopped(testRunner.getProcessContext());
+
+ testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
+ assertProvenanceEvents(1);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
- @BeforeEach
- public void init() {
- PUBLISH_WAIT_MS = 0;
+ @Test
+ public void testResizeBuffer() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
- broker = "tcp://localhost:1883";
- UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
- testRunner = TestRunners.newTestRunner(proc);
- testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
- testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
- testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
- testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+ testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
+ testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+ publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+ consumeMQTT.onUnscheduled(testRunner.getProcessContext());
+
+ testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
+ testRunner.assertValid();
+
+ testRunner.run(1);
+
+ testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
+ assertProvenanceEvents(2);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ final MockFlowFile flowFile = flowFiles.get(0);
+
+ flowFile.assertContentEquals("testMessage");
+ flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
+ flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
+ flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
+ flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
+ flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
+ }
+
+ @Test
+ public void testConsumeRecordsWithAddedFields() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+ publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
+ publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertEquals(1, flowFiles.size());
+ assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ + "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
+ new String(flowFiles.get(0).toByteArray()));
+
+ final List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+ assertEquals(1, badFlowFiles.size());
+ assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
+ }
+
+ @Test
+ public void testConsumeDemarcator() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+ publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
+ publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertEquals(flowFiles.size(), 1);
+ assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ + THIS_IS_NOT_JSON + "\\n"
+ + "{\"name\":\"Apache NiFi\"}\\n",
+ new String(flowFiles.get(0).toByteArray()));
+
+ final List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+ assertEquals(0, badFlowFiles.size());
+ }
+
+ @Test
+ public void testConsumeRecordsWithoutAddedFields() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
+ publishMessage(THIS_IS_NOT_JSON, AT_LEAST_ONCE);
+ publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
+ assertEquals(1, flowFiles.size());
+ assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
+
+ final List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+ assertEquals(1, badFlowFiles.size());
+ assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
+ }
+
+ @Test
+ public void testConsumeRecordsOnlyBadData() throws Exception {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
+
+ testRunner.assertValid();
+
+ final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
+ consumeMQTT.onScheduled(testRunner.getProcessContext());
+ reconnect(consumeMQTT, testRunner.getProcessContext());
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ assertTrue(isConnected(consumeMQTT));
+
+ publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
+
+ Thread.sleep(PUBLISH_WAIT_MS);
+
+ testRunner.run(1, false, false);
+
+ final List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
+ assertEquals(1, badFlowFiles.size());
+ assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
}
@Test
public void testSslContextService() throws InitializationException, TlsException {
- String brokerURI = "ssl://localhost:8883";
- TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
- runner.setVariable("brokerURI", brokerURI);
- runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
- runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
- runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
- runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
+ testRunner = initializeTestRunner();
+ testRunner.setVariable("brokerURI", "ssl://localhost:8883");
+ testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
final SSLContextService sslContextService = mock(SSLContextService.class);
final String identifier = SSLContextService.class.getSimpleName();
when(sslContextService.getIdentifier()).thenReturn(identifier);
- final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
+ final SSLContext sslContext = SslContextFactory.createSslContext(new TemporaryKeyStoreBuilder().build());
when(sslContextService.createContext()).thenReturn(sslContext);
- runner.addControllerService(identifier, sslContextService);
- runner.enableControllerService(sslContextService);
- runner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, identifier);
+ testRunner.addControllerService(identifier, sslContextService);
+ testRunner.enableControllerService(sslContextService);
+ testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, identifier);
- ConsumeMQTT processor = (ConsumeMQTT) runner.getProcessor();
- processor.onScheduled(runner.getProcessContext());
+ final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
+ processor.onScheduled(testRunner.getProcessContext());
}
@Test
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
+ testRunner = initializeTestRunner(mqttTestClient);
+
testRunner.run(1, false);
- ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
- ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
+ final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
+ final ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]);
- when(mock.getTopic()).thenReturn("testTopic");
- BlockingQueue mqttQueue = getMqttQueue(processor);
+ when(mock.getTopic()).thenReturn(TOPIC_NAME);
+ final BlockingQueue mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
- ProcessSession session = testRunner.getProcessSessionFactory().createSession();
+ final ProcessSession session = testRunner.getProcessSessionFactory().createSession();
assertThrows(InvocationTargetException.class, () -> transferQueue(processor,
(ProcessSession) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{ProcessSession.class}, (proxy, method, args) -> {
@@ -128,8 +598,76 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
assertTrue(mqttQueue.contains(mock));
}
- @Override
- public void internalPublish(final StandardMqttMessage message, final String topicName) {
- mqttTestClient.publish(topicName, message);
+ private TestRunner initializeTestRunner() {
+ if (mqttTestClient != null) {
+ throw new IllegalStateException("mqttTestClient should be null, using ConsumeMQTT's default client!");
+ }
+
+ final TestRunner testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
+
+ setCommonProperties(testRunner);
+
+ return testRunner;
+ }
+
+ private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
+ final TestRunner testRunner = TestRunners.newTestRunner(new ConsumeMQTT() {
+ @Override
+ protected MqttClient createMqttClient() {
+ return mqttTestClient;
+ }
+ });
+
+ setCommonProperties(testRunner);
+
+ return testRunner;
+ }
+
+ private void setCommonProperties(TestRunner testRunner) {
+ testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, BROKER_URI);
+ testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, CLIENT_ID);
+ testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, TOPIC_NAME);
+ testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, INTERNAL_QUEUE_SIZE);
+ }
+
+ private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
+ final Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
+ f.setAccessible(true);
+ final MqttClient mqttClient = (MqttClient) f.get(processor);
+ return mqttClient.isConnected();
+ }
+
+
+ public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+ final Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
+ method.setAccessible(true);
+ method.invoke(processor, context);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static BlockingQueue getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
+ final Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
+ mqttQueueField.setAccessible(true);
+ return (BlockingQueue) mqttQueueField.get(consumeMQTT);
+ }
+
+ public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ final Method transferQueue = ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
+ transferQueue.setAccessible(true);
+ transferQueue.invoke(consumeMQTT, session);
+ }
+
+ private void assertProvenanceEvents(int count){
+ final List provenanceEvents = testRunner.getProvenanceEvents();
+ assertNotNull(provenanceEvents);
+ assertEquals(count, provenanceEvents.size());
+ if (count > 0) {
+ assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
+ }
+ }
+
+ private void publishMessage(final String payload, final int qos) {
+ final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
+ mqttTestClient.publish(TOPIC_NAME, message);
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 41181c65ed..b9815339da 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -17,52 +17,368 @@
package org.apache.nifi.processors.mqtt;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.processors.mqtt.common.MqttClient;
-import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
-import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import static java.util.Arrays.asList;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
+import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
+import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
-public class TestPublishMQTT extends TestPublishMqttCommon {
+public class TestPublishMQTT {
- @Override
- public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
- StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
- String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+ private static final String BROKER_URI = "tcp://localhost:1883";
+ private static final String TOPIC = "testTopic";
+ private static final String RETAIN = "false";
+
+ private MqttTestClient mqttTestClient;
+ private TestRunner testRunner;
+
+ @AfterEach
+ public void cleanup() {
+ testRunner = null;
+ mqttTestClient = null;
+ }
+
+ @Test
+ public void testQoS0() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertProvenanceEvent();
+
+ verifyPublishedMessage(testMessage.getBytes(), 0, false);
+ }
+
+ @Test
+ public void testQoS1() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 1, false);
+ }
+
+ @Test
+ public void testQoS2NotCleanSession() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testQoS2() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, false);
+ }
+
+ @Test
+ public void testRetainQoS2() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
+
+ testRunner.assertValid();
+
+ final String testMessage = "testMessage";
+ testRunner.enqueue(testMessage.getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent();
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testMessage.getBytes(), 2, true);
+ }
+
+ @Test
+ public void testPublishRecordSet() throws InitializationException {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final ArrayNode testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
+
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
+ }
+
+ @Test
+ public void testPublishRecordSetFailed() throws InitializationException {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final ArrayNode testInput = createTestJsonInput();
+
+ testRunner.enqueue(testInput.toString().getBytes());
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 1));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ List flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws InitializationException {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod()
+ .doThrow(new RuntimeException("Second publish failed."))
+ .when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ final ArrayNode testInput = createTestJsonInput();
+
+ final Map attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 2));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile failedFlowFile = flowFiles.get(0);
+ assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
+ }
+
+ @Test
+ public void testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws InitializationException {
+ mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
+ Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
+ testRunner = initializeTestRunner(mqttTestClient);
+
+ testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
+ testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
+ testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
+ testRunner.assertValid();
+
+ final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
+ final ArrayNode testInput = createTestJsonInput();
+
+ final Map attributes = new HashMap<>();
+ attributes.put(publishFailedIndexAttributeName, "1");
+ testRunner.enqueue(testInput.toString().getBytes(), attributes);
+
+ testRunner.run();
+
+ testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+ assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 3));
+
+ verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
+ verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
+ verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
+ verifyNoMorePublished();
+
+ final List flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+
+ final MockFlowFile successfulFlowFile = flowFiles.get(0);
+ assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
+ publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
+ }
+
+ private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
+ final Pair lastPublished = mqttTestClient.getLastPublished();
+ final String lastPublishedTopic = lastPublished.getLeft();
+ final StandardMqttMessage lastPublishedMessage = lastPublished.getRight();
assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
assertEquals(qos, lastPublishedMessage.getQos());
assertEquals(retain, lastPublishedMessage.isRetained());
- assertEquals(topic, lastPublishedTopic);
+ assertEquals(TOPIC, lastPublishedTopic);
}
- private MqttTestClient mqttTestClient;
-
- public class UnitTestablePublishMqtt extends PublishMQTT {
-
- public UnitTestablePublishMqtt(){
- super();
- }
-
- @Override
- protected MqttClient createMqttClient() throws MqttException {
- mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
- return mqttTestClient;
- }
+ private void verifyNoMorePublished() {
+ assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");
}
- @BeforeEach
- public void init() {
- UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
- testRunner = TestRunners.newTestRunner(proc);
- testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
- testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
- topic = "testTopic";
- testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
+ private ProvenanceEventRecord assertProvenanceEvent() {
+ final List provenanceEvents = testRunner.getProvenanceEvents();
+ assertNotNull(provenanceEvents);
+ assertEquals(1, provenanceEvents.size());
+
+ final ProvenanceEventRecord event = provenanceEvents.get(0);
+ assertEquals(ProvenanceEventType.SEND, event.getEventType());
+
+ return event;
+ }
+
+ private void assertProvenanceEvent(String expectedDetails) {
+ final ProvenanceEventRecord event = assertProvenanceEvent();
+ assertEquals(expectedDetails, event.getDetails());
+ }
+
+ private static ArrayNode createTestJsonInput() {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ return mapper.createArrayNode().addAll(asList(
+ mapper.createObjectNode()
+ .put("recordId", 1)
+ .put("firstAttribute", "foo")
+ .put("secondAttribute", false),
+ mapper.createObjectNode()
+ .put("recordId", 2)
+ .put("firstAttribute", "bar")
+ .put("secondAttribute", true),
+ mapper.createObjectNode()
+ .put("recordId", 3)
+ .put("firstAttribute", "foobar")
+ .put("secondAttribute", false)
+ ));
+ }
+
+ private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
+ final TestRunner testRunner = TestRunners.newTestRunner(new PublishMQTT() {
+ @Override
+ protected MqttClient createMqttClient() {
+ return mqttTestClient;
+ }
+ });
+
+ testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, BROKER_URI);
+ testRunner.setProperty(PublishMQTT.PROP_RETAIN, RETAIN);
+ testRunner.setProperty(PublishMQTT.PROP_TOPIC, TOPIC);
+
+ return testRunner;
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
index 91997061bc..dcb87b612c 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -17,23 +17,26 @@
package org.apache.nifi.processors.mqtt.common;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.LinkedList;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
public class MqttTestClient implements MqttClient {
+ private final Queue> publishedMessages = new LinkedList<>();
+
public AtomicBoolean connected = new AtomicBoolean(false);
public MqttCallback mqttCallback;
public ConnectType type;
- public enum ConnectType {Publisher, Subscriber}
- private StandardMqttMessage lastPublishedMessage;
- private String lastPublishedTopic;
+ public enum ConnectType {Publisher, Subscriber}
public String subscribedTopic;
public int subscribedQos;
-
public MqttTestClient(ConnectType type) {
this.type = type;
}
@@ -62,8 +65,7 @@ public class MqttTestClient implements MqttClient {
public void publish(String topic, StandardMqttMessage message) {
switch (type) {
case Publisher:
- lastPublishedMessage = message;
- lastPublishedTopic = topic;
+ publishedMessages.add(Pair.of(topic, message));
break;
case Subscriber:
mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic));
@@ -82,11 +84,7 @@ public class MqttTestClient implements MqttClient {
this.mqttCallback = callback;
}
- public StandardMqttMessage getLastPublishedMessage() {
- return lastPublishedMessage;
- }
-
- public String getLastPublishedTopic() {
- return lastPublishedTopic;
+ public Pair getLastPublished() {
+ return publishedMessages.poll();
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java
new file mode 100644
index 0000000000..d2e8de0382
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.util.TestRunner;
+
+public class MqttTestUtil {
+
+ public static String createJsonRecordSetReaderService(TestRunner testRunner) throws InitializationException {
+ final String id = "record-reader";
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ testRunner.addControllerService(id, jsonReader);
+ testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
+ testRunner.enableControllerService(jsonReader);
+ return id;
+ }
+
+ public static String createJsonRecordSetWriterService(TestRunner testRunner) throws InitializationException {
+ final String id = "record-writer";
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ testRunner.addControllerService(id, jsonWriter);
+ testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
+ testRunner.enableControllerService(jsonWriter);
+ return id;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
deleted file mode 100644
index 7cbaa11c8d..0000000000
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.common;
-
-import org.apache.nifi.json.JsonRecordSetWriter;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.mqtt.ConsumeMQTT;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
-import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public abstract class TestConsumeMqttCommon {
-
- public int PUBLISH_WAIT_MS = 1000;
- public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
-
- public TestRunner testRunner;
- public String broker;
-
- private static final String STRING_MESSAGE = "testMessage";
- private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
-
- private static final int MOST_ONE = 0;
- private static final int LEAST_ONE = 1;
- private static final int EXACTLY_ONCE = 2;
-
- public abstract void internalPublish(StandardMqttMessage message, String topicName);
-
- @Test
- public void testClientIDConfiguration() {
- TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
- runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
- runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
- runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
- runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
- runner.assertValid();
-
- runner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
- runner.assertNotValid();
-
- runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
- runner.assertValid();
-
- runner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
- runner.assertValid();
- }
-
- @Test
- public void testLastWillConfig() {
- testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
- testRunner.assertNotValid();
- testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
- testRunner.assertNotValid();
- testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
- testRunner.assertNotValid();
- testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
- testRunner.assertValid();
- }
-
-
- @Test
- public void testQoS2() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
- assertProvenanceEvents(1);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
- @Test
- public void testQoS2NotCleanSession() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
- testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
- publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- testRunner.run(1, false, false);
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
- assertProvenanceEvents(1);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
-
- @Test
- public void testQoS1() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(STRING_MESSAGE, LEAST_ONE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertTrue(flowFiles.size() > 0);
- assertProvenanceEvents(flowFiles.size());
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
- @Test
- public void testQoS1NotCleanSession() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
- testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
- publishMessage(STRING_MESSAGE, LEAST_ONE);
-
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- testRunner.run(1, false, false);
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertTrue(flowFiles.size() > 0);
- assertProvenanceEvents(flowFiles.size());
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
- @Test
- public void testQoS0() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(STRING_MESSAGE, MOST_ONE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertTrue(flowFiles.size() < 2);
- assertProvenanceEvents(flowFiles.size());
-
- if(flowFiles.size() == 1) {
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
- }
-
- @Test
- public void testOnStoppedFinish() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
-
- testRunner.assertValid();
-
- final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
- ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic");
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
-
- Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
- f.setAccessible(true);
- @SuppressWarnings("unchecked")
- LinkedBlockingQueue queue = (LinkedBlockingQueue) f.get(consumeMQTT);
- queue.add(testMessage);
-
- consumeMQTT.onUnscheduled(testRunner.getProcessContext());
- consumeMQTT.onStopped(testRunner.getProcessContext());
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
- assertProvenanceEvents(1);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
- @Test
- public void testResizeBuffer() throws Exception {
- testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
- testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
- publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
- consumeMQTT.onUnscheduled(testRunner.getProcessContext());
-
- testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
- testRunner.assertNotValid();
-
- testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
- testRunner.assertValid();
-
- testRunner.run(1);
-
- testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
- assertProvenanceEvents(2);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- MockFlowFile flowFile = flowFiles.get(0);
-
- flowFile.assertContentEquals("testMessage");
- flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
- flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
- flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
- flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
- flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
- }
-
- @Test
- public void testConsumeRecordsWithAddedFields() throws Exception {
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
-
- final JsonTreeReader jsonReader = new JsonTreeReader();
- testRunner.addControllerService("record-reader", jsonReader);
- testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
- testRunner.enableControllerService(jsonReader);
-
- final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
- testRunner.addControllerService("record-writer", jsonWriter);
- testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
- testRunner.enableControllerService(jsonWriter);
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(JSON_PAYLOAD, MOST_ONE);
- publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
- publishMessage(JSON_PAYLOAD, MOST_ONE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertEquals(1, flowFiles.size());
- assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
- + "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
- new String(flowFiles.get(0).toByteArray()));
-
- List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertEquals(1, badFlowFiles.size());
- assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
-
- // clean runner by removing records reader/writer
- testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
- testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
- }
-
- @Test
- public void testConsumeDemarcator() throws Exception {
- testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(JSON_PAYLOAD, MOST_ONE);
- publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
- publishMessage(JSON_PAYLOAD, MOST_ONE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertEquals(flowFiles.size(), 1);
- assertEquals("{\"name\":\"Apache NiFi\"}\\n"
- + THIS_IS_NOT_JSON + "\\n"
- + "{\"name\":\"Apache NiFi\"}\\n",
- new String(flowFiles.get(0).toByteArray()));
-
- List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertEquals(0, badFlowFiles.size());
-
- // clean runner by removing message demarcator
- testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
- }
-
- @Test
- public void testConsumeRecordsWithoutAddedFields() throws Exception {
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
- testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
-
- final JsonTreeReader jsonReader = new JsonTreeReader();
- testRunner.addControllerService("record-reader", jsonReader);
- testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
- testRunner.enableControllerService(jsonReader);
-
- final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
- testRunner.addControllerService("record-writer", jsonWriter);
- testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
- testRunner.enableControllerService(jsonWriter);
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(JSON_PAYLOAD, LEAST_ONE);
- publishMessage(THIS_IS_NOT_JSON, LEAST_ONE);
- publishMessage(JSON_PAYLOAD, LEAST_ONE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
- assertEquals(1, flowFiles.size());
- assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
-
- List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertEquals(1, badFlowFiles.size());
- assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
-
- // clean runner by removing records reader/writer
- testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
- testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
- }
-
- @Test
- public void testConsumeRecordsOnlyBadData() throws Exception {
- testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
- testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
- testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
-
- final JsonTreeReader jsonReader = new JsonTreeReader();
- testRunner.addControllerService("record-reader", jsonReader);
- testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
- testRunner.enableControllerService(jsonReader);
-
- final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
- testRunner.addControllerService("record-writer", jsonWriter);
- testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
- testRunner.enableControllerService(jsonWriter);
-
- testRunner.assertValid();
-
- ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
- consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT, testRunner.getProcessContext());
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- assertTrue(isConnected(consumeMQTT));
-
- publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
-
- Thread.sleep(PUBLISH_WAIT_MS);
-
- testRunner.run(1, false, false);
-
- List badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
- assertEquals(1, badFlowFiles.size());
- assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
-
- // clean runner by removing records reader/writer
- testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
- testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
- }
-
- private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
- Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
- f.setAccessible(true);
- MqttClient mqttClient = (MqttClient) f.get(processor);
- return mqttClient.isConnected();
- }
-
-
- public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
- Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
- method.setAccessible(true);
- method.invoke(processor, context);
- }
-
- @SuppressWarnings("unchecked")
- public static BlockingQueue getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
- Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
- mqttQueueField.setAccessible(true);
- return (BlockingQueue) mqttQueueField.get(consumeMQTT);
- }
-
- public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
- Method transferQueue = ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
- transferQueue.setAccessible(true);
- transferQueue.invoke(consumeMQTT, session);
- }
-
- private void assertProvenanceEvents(int count){
- List provenanceEvents = testRunner.getProvenanceEvents();
- assertNotNull(provenanceEvents);
- assertEquals(count, provenanceEvents.size());
- if (count > 0) {
- assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
- }
- }
-
- private void publishMessage(final String payload, final int qos) {
- final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
- internalPublish(message, "testTopic");
- }
-}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
deleted file mode 100644
index d82bc4da65..0000000000
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestPublishMqttCommon.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.mqtt.common;
-
-import org.apache.nifi.processors.mqtt.PublishMQTT;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
-import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-
-public abstract class TestPublishMqttCommon {
-
- public TestRunner testRunner;
- public String topic;
-
- public abstract void verifyPublishedMessage(byte[] payload, int qos, boolean retain);
-
- @Test
- public void testQoS0() {
- testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
-
- testRunner.assertValid();
-
- String testMessage = "testMessage";
- testRunner.enqueue(testMessage.getBytes());
-
- testRunner.run();
-
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
-
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- assertProvenanceEvents();
-
- verifyPublishedMessage(testMessage.getBytes(), 0, false);
- }
-
- @Test
- public void testQoS1() {
- testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
-
- testRunner.assertValid();
-
- String testMessage = "testMessage";
- testRunner.enqueue(testMessage.getBytes());
-
- testRunner.run();
-
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
- assertProvenanceEvents();
-
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- verifyPublishedMessage(testMessage.getBytes(), 1, false);
- }
-
- @Test
- public void testQoS2NotCleanSession() {
- // Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
- testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
- testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
-
- testRunner.assertValid();
-
- String testMessage = "testMessage";
- testRunner.enqueue(testMessage.getBytes());
-
- testRunner.run();
-
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
- assertProvenanceEvents();
-
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- verifyPublishedMessage(testMessage.getBytes(), 2, false);
- }
-
- @Test
- public void testQoS2() {
- testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
-
- testRunner.assertValid();
-
- String testMessage = "testMessage";
- testRunner.enqueue(testMessage.getBytes());
-
- testRunner.run();
-
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
- assertProvenanceEvents();
-
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- verifyPublishedMessage(testMessage.getBytes(), 2, false);
- }
-
- @Test
- public void testRetainQoS2() {
- testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
- testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
-
- testRunner.assertValid();
-
- String testMessage = "testMessage";
- testRunner.enqueue(testMessage.getBytes());
-
- testRunner.run();
-
- testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
- assertProvenanceEvents();
-
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- verifyPublishedMessage(testMessage.getBytes(), 2, true);
- }
-
- private void assertProvenanceEvents(){
- List provenanceEvents = testRunner.getProvenanceEvents();
- assertNotNull(provenanceEvents);
- assertEquals(1, provenanceEvents.size());
- assertEquals(ProvenanceEventType.SEND, provenanceEvents.get(0).getEventType());
- }
-}