NIFI-10411 Add record processing feature to PublishMQTT processor

This closes #6373.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-08-30 14:14:21 +02:00 committed by Peter Turcsanyi
parent b12510e54a
commit eb68ffad70
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
10 changed files with 1161 additions and 867 deletions

View File

@ -93,5 +93,10 @@
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -161,22 +161,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for received messages")
.identifiesControllerService(RecordReaderFactory.class)
.required(false)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(false)
.build();
public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
@ -201,6 +185,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_READER)
.description("The Record Reader to use for parsing received MQTT Messages into Records.")
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_WRITER)
.description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
.build();
private volatile int qos;
private volatile String topicPrefix = "";
private volatile String topicFilter;
@ -229,10 +223,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(PROP_TOPIC_FILTER);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
innerDescriptorsList.add(RECORD_READER);
innerDescriptorsList.add(RECORD_WRITER);
innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS);
innerDescriptorsList.add(MESSAGE_DEMARCATOR);
innerDescriptorsList.add(RECORD_READER);
innerDescriptorsList.add(RECORD_WRITER);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<>();
@ -292,16 +286,10 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("both Record Reader and Writer must be set when used.").build());
}
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
.explanation("message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
}
return results;
@ -461,7 +449,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>();
final Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
@ -476,7 +464,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
FlowFile flowFile = session.create();
final FlowFile flowFile = session.create();
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
final Map<String, String> attributes = new HashMap<>();

View File

@ -39,18 +39,30 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttCallback;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Optional.ofNullable;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -86,6 +98,16 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
.addValidator(RETAIN_VALIDATOR)
.build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_READER)
.description("The Record Reader to use for parsing the incoming FlowFile into Records.")
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_RECORD_WRITER)
.description("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
@ -103,6 +125,8 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(PROP_TOPIC);
innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_RETAIN);
innerDescriptorsList.add(RECORD_READER);
innerDescriptorsList.add(RECORD_WRITER);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<>();
@ -111,9 +135,17 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
}
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = ".mqtt.publish.failed.index";
private String publishFailedIndexAttributeName;
@Override
protected void init(final ProcessorInitializationContext context) {
logger = getLogger();
publishFailedIndexAttributeName = getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
}
@Override
@ -162,28 +194,105 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
return;
}
// do the read
final byte[] messageContent = new byte[(int) flowfile.getSize()];
session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
if (context.getProperty(RECORD_READER).isSet()) {
processRecordSet(context, session, flowfile, topic);
} else {
processStandardFlowFile(context, session, flowfile, topic);
}
}
private void processRecordSet(ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
final StopWatch stopWatch = new StopWatch(true);
final AtomicInteger processedRecords = new AtomicInteger();
try {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Long previousProcessFailedAt = ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
session.read(flowfile, in -> {
try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
while ((record = recordSet.next()) != null) {
if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
processedRecords.getAndIncrement();
continue;
}
baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
writer.write(record);
writer.flush();
}
final byte[] messageContent = baos.toByteArray();
publishMessage(context, flowfile, topic, messageContent);
processedRecords.getAndIncrement();
}
} catch (SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("An error happened during creating components for serialization.", e);
}
});
FlowFile successFlowFile = flowfile;
String provenanceEventDetails;
if (previousProcessFailedAt != null) {
successFlowFile = session.removeAttribute(flowfile, publishFailedIndexAttributeName);
provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, processedRecords.get());
} else {
provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
}
session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(successFlowFile, REL_SUCCESS);
} catch (Exception e) {
logger.error("An error happened during publishing records. Routing to failure.", e);
FlowFile failedFlowFile = session.putAttribute(flowfile, publishFailedIndexAttributeName, String.valueOf(processedRecords.get()));
if (processedRecords.get() > 0) {
session.getProvenanceReporter().send(
failedFlowFile,
clientProperties.getBroker(),
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
session.transfer(failedFlowFile, REL_FAILURE);
}
}
private void processStandardFlowFile(ProcessContext context, ProcessSession session, FlowFile flowfile, String topic) {
try {
final byte[] messageContent = new byte[(int) flowfile.getSize()];
session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
final StopWatch stopWatch = new StopWatch(true);
publishMessage(context, flowfile, topic, messageContent);
session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
} catch (Exception e) {
logger.error("An error happened during publishing a message. Routing to failure.", e);
session.transfer(flowfile, REL_FAILURE);
}
}
private void publishMessage(ProcessContext context, FlowFile flowfile, String topic, byte[] messageContent) {
int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained);
try {
final StopWatch stopWatch = new StopWatch(true);
/*
* Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
* MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
*/
mqttClient.publish(topic, mqttMessage);
session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
} catch (MqttException me) {
logger.error("Failed to publish message.", me);
session.transfer(flowfile, REL_FAILURE);
}
mqttClient.publish(topic, mqttMessage);
}
private void initializeClient(ProcessContext context) {

View File

@ -32,6 +32,8 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.ssl.SSLContextService;
import java.net.URI;
@ -42,6 +44,7 @@ import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
import static org.apache.commons.lang3.StringUtils.EMPTY;
@ -93,7 +96,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
};
private static String getSupportedSchemeList() {
return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
return Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).collect(Collectors.joining(", "));
}
public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> {
@ -232,6 +235,20 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor BASE_RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.identifiesControllerService(RecordReaderFactory.class)
.required(false)
.build();
public static final PropertyDescriptor BASE_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(false)
.build();
public static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_BROKER_URI);
@ -287,6 +304,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false).explanation("it is not valid URI syntax.").build());
}
final boolean readerIsSet = validationContext.getProperty(BASE_RECORD_READER).isSet();
final boolean writerIsSet = validationContext.getProperty(BASE_RECORD_WRITER).isSet();
if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("both Record Reader and Writer must be set when used.").build());
}
return results;
}
@ -366,9 +390,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
if (sslProp.isSet()) {
final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
}

View File

@ -17,105 +17,575 @@
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConsumeMQTT extends TestConsumeMqttCommon {
private static TlsConfiguration tlsConfiguration;
public class TestConsumeMQTT {
public MqttTestClient mqttTestClient;
private static final int PUBLISH_WAIT_MS = 0;
private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
private static final String BROKER_URI = "tcp://localhost:1883";
private static final String CLIENT_ID = "TestClient";
private static final String TOPIC_NAME = "testTopic";
private static final String INTERNAL_QUEUE_SIZE = "100";
public class UnitTestableConsumeMqtt extends ConsumeMQTT {
private static final String STRING_MESSAGE = "testMessage";
private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
public UnitTestableConsumeMqtt(){
super();
}
private static final int AT_MOST_ONCE = 0;
private static final int AT_LEAST_ONCE = 1;
private static final int EXACTLY_ONCE = 2;
@Override
protected MqttClient createMqttClient() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
return mqttTestClient;
private MqttTestClient mqttTestClient;
private TestRunner testRunner;
@AfterEach
public void cleanup() {
testRunner = null;
mqttTestClient = null;
}
@Test
public void testClientIDConfiguration() {
testRunner = initializeTestRunner();
testRunner.assertValid();
testRunner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
testRunner.assertValid();
testRunner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
testRunner.assertValid();
}
@Test
public void testLastWillConfig() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
testRunner.assertValid();
}
@Test
public void testQoS2() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS2NotCleanSession() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1NotCleanSession() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
publishMessage(STRING_MESSAGE, AT_LEAST_ONCE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS0() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, AT_MOST_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() < 2);
assertProvenanceEvents(flowFiles.size());
if(flowFiles.size() == 1) {
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}
@BeforeAll
public static void setTlsConfiguration() {
tlsConfiguration = new TemporaryKeyStoreBuilder().build();
@Test
public void testOnStoppedFinish() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
final ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, TOPIC_NAME);
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
final Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
@SuppressWarnings("unchecked")
final LinkedBlockingQueue<ReceivedMqttMessage> queue = (LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@BeforeEach
public void init() {
PUBLISH_WAIT_MS = 0;
@Test
public void testResizeBuffer() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
broker = "tcp://localhost:1883";
UnitTestableConsumeMqtt proc = new UnitTestableConsumeMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, broker);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
testRunner.assertValid();
testRunner.run(1);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
assertProvenanceEvents(2);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
final MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, BROKER_URI);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, TOPIC_NAME);
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testConsumeRecordsWithAddedFields() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.get(0).toByteArray()));
final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
}
@Test
public void testConsumeDemarcator() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
publishMessage(THIS_IS_NOT_JSON, AT_MOST_ONCE);
publishMessage(JSON_PAYLOAD, AT_MOST_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ THIS_IS_NOT_JSON + "\\n"
+ "{\"name\":\"Apache NiFi\"}\\n",
new String(flowFiles.get(0).toByteArray()));
final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(0, badFlowFiles.size());
}
@Test
public void testConsumeRecordsWithoutAddedFields() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
publishMessage(THIS_IS_NOT_JSON, AT_LEAST_ONCE);
publishMessage(JSON_PAYLOAD, AT_LEAST_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
}
@Test
public void testConsumeRecordsOnlyBadData() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
testRunner.assertValid();
final ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
final List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
}
@Test
public void testSslContextService() throws InitializationException, TlsException {
String brokerURI = "ssl://localhost:8883";
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
runner.setVariable("brokerURI", brokerURI);
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
testRunner = initializeTestRunner();
testRunner.setVariable("brokerURI", "ssl://localhost:8883");
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
final SSLContextService sslContextService = mock(SSLContextService.class);
final String identifier = SSLContextService.class.getSimpleName();
when(sslContextService.getIdentifier()).thenReturn(identifier);
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
final SSLContext sslContext = SslContextFactory.createSslContext(new TemporaryKeyStoreBuilder().build());
when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(identifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, identifier);
testRunner.addControllerService(identifier, sslContextService);
testRunner.enableControllerService(sslContextService);
testRunner.setProperty(ConsumeMQTT.PROP_SSL_CONTEXT_SERVICE, identifier);
ConsumeMQTT processor = (ConsumeMQTT) runner.getProcessor();
processor.onScheduled(runner.getProcessContext());
final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
processor.onScheduled(testRunner.getProcessContext());
}
@Test
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
final ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
final ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]);
when(mock.getTopic()).thenReturn("testTopic");
BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
when(mock.getTopic()).thenReturn(TOPIC_NAME);
final BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
final ProcessSession session = testRunner.getProcessSessionFactory().createSession();
assertThrows(InvocationTargetException.class, () -> transferQueue(processor,
(ProcessSession) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{ProcessSession.class}, (proxy, method, args) -> {
@ -128,8 +598,76 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
assertTrue(mqttQueue.contains(mock));
}
@Override
public void internalPublish(final StandardMqttMessage message, final String topicName) {
mqttTestClient.publish(topicName, message);
private TestRunner initializeTestRunner() {
if (mqttTestClient != null) {
throw new IllegalStateException("mqttTestClient should be null, using ConsumeMQTT's default client!");
}
final TestRunner testRunner = TestRunners.newTestRunner(ConsumeMQTT.class);
setCommonProperties(testRunner);
return testRunner;
}
private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
final TestRunner testRunner = TestRunners.newTestRunner(new ConsumeMQTT() {
@Override
protected MqttClient createMqttClient() {
return mqttTestClient;
}
});
setCommonProperties(testRunner);
return testRunner;
}
private void setCommonProperties(TestRunner testRunner) {
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, BROKER_URI);
testRunner.setProperty(ConsumeMQTT.PROP_CLIENTID, CLIENT_ID);
testRunner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, TOPIC_NAME);
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, INTERNAL_QUEUE_SIZE);
}
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
final Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true);
final MqttClient mqttClient = (MqttClient) f.get(processor);
return mqttClient.isConnected();
}
public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
final Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
method.setAccessible(true);
method.invoke(processor, context);
}
@SuppressWarnings("unchecked")
public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
final Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true);
return (BlockingQueue<ReceivedMqttMessage>) mqttQueueField.get(consumeMQTT);
}
public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
final Method transferQueue = ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
transferQueue.setAccessible(true);
transferQueue.invoke(consumeMQTT, session);
}
private void assertProvenanceEvents(int count){
final List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(count, provenanceEvents.size());
if (count > 0) {
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
}
}
private void publishMessage(final String payload, final int qos) {
final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
mqttTestClient.publish(TOPIC_NAME, message);
}
}

View File

@ -17,52 +17,368 @@
package org.apache.nifi.processors.mqtt;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetReaderService;
import static org.apache.nifi.processors.mqtt.common.MqttTestUtil.createJsonRecordSetWriterService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
public class TestPublishMQTT extends TestPublishMqttCommon {
public class TestPublishMQTT {
@Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
private static final String BROKER_URI = "tcp://localhost:1883";
private static final String TOPIC = "testTopic";
private static final String RETAIN = "false";
private MqttTestClient mqttTestClient;
private TestRunner testRunner;
@AfterEach
public void cleanup() {
testRunner = null;
mqttTestClient = null;
}
@Test
public void testQoS0() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
testRunner.assertValid();
final String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
assertProvenanceEvent();
verifyPublishedMessage(testMessage.getBytes(), 0, false);
}
@Test
public void testQoS1() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
testRunner.assertValid();
final String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 1, false);
}
@Test
public void testQoS2NotCleanSession() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
// Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
final String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testQoS2() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testRetainQoS2() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
testRunner.assertValid();
final String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, true);
}
@Test
public void testPublishRecordSet() throws InitializationException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final ArrayNode testInput = createTestJsonInput();
testRunner.enqueue(testInput.toString().getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, 3));
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
}
@Test
public void testPublishRecordSetFailed() throws InitializationException {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final ArrayNode testInput = createTestJsonInput();
testRunner.enqueue(testInput.toString().getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 1));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(0).toString().getBytes(), 2, false);
verifyNoMorePublished();
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishRecordsAndFailAgainWhenPreviousPublishFailed() throws InitializationException {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
final ArrayNode testInput = createTestJsonInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(testInput.toString().getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, 2));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishRecordsSuccessfullyWhenPreviousPublishFailed() throws InitializationException {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
final ArrayNode testInput = createTestJsonInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(testInput.toString().getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, 3));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(1).toString().getBytes(), 2, false);
verifyPublishedMessage(testInput.get(2).toString().getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
}
private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
final Pair<String, StandardMqttMessage> lastPublished = mqttTestClient.getLastPublished();
final String lastPublishedTopic = lastPublished.getLeft();
final StandardMqttMessage lastPublishedMessage = lastPublished.getRight();
assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
assertEquals(qos, lastPublishedMessage.getQos());
assertEquals(retain, lastPublishedMessage.isRetained());
assertEquals(topic, lastPublishedTopic);
assertEquals(TOPIC, lastPublishedTopic);
}
private MqttTestClient mqttTestClient;
public class UnitTestablePublishMqtt extends PublishMQTT {
public UnitTestablePublishMqtt(){
super();
}
@Override
protected MqttClient createMqttClient() throws MqttException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
return mqttTestClient;
}
private void verifyNoMorePublished() {
assertNull(mqttTestClient.getLastPublished(), "TestClient's queue should be empty.");
}
@BeforeEach
public void init() {
UnitTestablePublishMqtt proc = new UnitTestablePublishMqtt();
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "false");
topic = "testTopic";
testRunner.setProperty(PublishMQTT.PROP_TOPIC, topic);
private ProvenanceEventRecord assertProvenanceEvent() {
final List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(1, provenanceEvents.size());
final ProvenanceEventRecord event = provenanceEvents.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
return event;
}
private void assertProvenanceEvent(String expectedDetails) {
final ProvenanceEventRecord event = assertProvenanceEvent();
assertEquals(expectedDetails, event.getDetails());
}
private static ArrayNode createTestJsonInput() {
final ObjectMapper mapper = new ObjectMapper();
return mapper.createArrayNode().addAll(asList(
mapper.createObjectNode()
.put("recordId", 1)
.put("firstAttribute", "foo")
.put("secondAttribute", false),
mapper.createObjectNode()
.put("recordId", 2)
.put("firstAttribute", "bar")
.put("secondAttribute", true),
mapper.createObjectNode()
.put("recordId", 3)
.put("firstAttribute", "foobar")
.put("secondAttribute", false)
));
}
private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
final TestRunner testRunner = TestRunners.newTestRunner(new PublishMQTT() {
@Override
protected MqttClient createMqttClient() {
return mqttTestClient;
}
});
testRunner.setProperty(PublishMQTT.PROP_BROKER_URI, BROKER_URI);
testRunner.setProperty(PublishMQTT.PROP_RETAIN, RETAIN);
testRunner.setProperty(PublishMQTT.PROP_TOPIC, TOPIC);
return testRunner;
}
}

View File

@ -17,23 +17,26 @@
package org.apache.nifi.processors.mqtt.common;
import org.apache.commons.lang3.tuple.Pair;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
public class MqttTestClient implements MqttClient {
private final Queue<Pair<String, StandardMqttMessage>> publishedMessages = new LinkedList<>();
public AtomicBoolean connected = new AtomicBoolean(false);
public MqttCallback mqttCallback;
public ConnectType type;
public enum ConnectType {Publisher, Subscriber}
private StandardMqttMessage lastPublishedMessage;
private String lastPublishedTopic;
public enum ConnectType {Publisher, Subscriber}
public String subscribedTopic;
public int subscribedQos;
public MqttTestClient(ConnectType type) {
this.type = type;
}
@ -62,8 +65,7 @@ public class MqttTestClient implements MqttClient {
public void publish(String topic, StandardMqttMessage message) {
switch (type) {
case Publisher:
lastPublishedMessage = message;
lastPublishedTopic = topic;
publishedMessages.add(Pair.of(topic, message));
break;
case Subscriber:
mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic));
@ -82,11 +84,7 @@ public class MqttTestClient implements MqttClient {
this.mqttCallback = callback;
}
public StandardMqttMessage getLastPublishedMessage() {
return lastPublishedMessage;
}
public String getLastPublishedTopic() {
return lastPublishedTopic;
public Pair<String, StandardMqttMessage> getLastPublished() {
return publishedMessages.poll();
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.TestRunner;
public class MqttTestUtil {
public static String createJsonRecordSetReaderService(TestRunner testRunner) throws InitializationException {
final String id = "record-reader";
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService(id, jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
return id;
}
public static String createJsonRecordSetWriterService(TestRunner testRunner) throws InitializationException {
final String id = "record-writer";
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService(id, jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
return id;
}
}

View File

@ -1,587 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public abstract class TestConsumeMqttCommon {
public int PUBLISH_WAIT_MS = 1000;
public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
public TestRunner testRunner;
public String broker;
private static final String STRING_MESSAGE = "testMessage";
private static final String JSON_PAYLOAD = "{\"name\":\"Apache NiFi\"}";
private static final int MOST_ONE = 0;
private static final int LEAST_ONE = 1;
private static final int EXACTLY_ONCE = 2;
public abstract void internalPublish(StandardMqttMessage message, String topicName);
@Test
public void testClientIDConfiguration() {
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
runner.assertValid();
runner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
runner.assertNotValid();
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
runner.assertValid();
runner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
runner.assertValid();
}
@Test
public void testLastWillConfig() {
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_TOPIC, "lastWill topic");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_QOS, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_RETAIN, "false");
testRunner.assertValid();
}
@Test
public void testQoS2() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS2NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, LEAST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS1NotCleanSession() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "1");
testRunner.setProperty(ConsumeMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
publishMessage(STRING_MESSAGE, LEAST_ONE);
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
testRunner.run(1, false, false);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() > 0);
assertProvenanceEvents(flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "1");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testQoS0() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "0");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() < 2);
assertProvenanceEvents(flowFiles.size());
if(flowFiles.size() == 1) {
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "0");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
}
@Test
public void testOnStoppedFinish() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.assertValid();
final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic");
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
consumeMQTT.processSessionFactory = testRunner.getProcessSessionFactory();
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
@SuppressWarnings("unchecked")
LinkedBlockingQueue<ReceivedMqttMessage> queue = (LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
consumeMQTT.onStopped(testRunner.getProcessContext());
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 1);
assertProvenanceEvents(1);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testResizeBuffer() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_QOS, "2");
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "2");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
publishMessage(STRING_MESSAGE, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "1");
testRunner.assertNotValid();
testRunner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "3");
testRunner.assertValid();
testRunner.run(1);
testRunner.assertTransferCount(ConsumeMQTT.REL_MESSAGE, 2);
assertProvenanceEvents(2);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
MockFlowFile flowFile = flowFiles.get(0);
flowFile.assertContentEquals("testMessage");
flowFile.assertAttributeEquals(BROKER_ATTRIBUTE_KEY, broker);
flowFile.assertAttributeEquals(TOPIC_ATTRIBUTE_KEY, "testTopic");
flowFile.assertAttributeEquals(QOS_ATTRIBUTE_KEY, "2");
flowFile.assertAttributeEquals(IS_DUPLICATE_ATTRIBUTE_KEY, "false");
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
}
@Test
public void testConsumeRecordsWithAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, MOST_ONE);
publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
publishMessage(JSON_PAYLOAD, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeDemarcator() throws Exception {
testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, MOST_ONE);
publishMessage(THIS_IS_NOT_JSON, MOST_ONE);
publishMessage(JSON_PAYLOAD, MOST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ THIS_IS_NOT_JSON + "\\n"
+ "{\"name\":\"Apache NiFi\"}\\n",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(0, badFlowFiles.size());
// clean runner by removing message demarcator
testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
}
@Test
public void testConsumeRecordsWithoutAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(JSON_PAYLOAD, LEAST_ONE);
publishMessage(THIS_IS_NOT_JSON, LEAST_ONE);
publishMessage(JSON_PAYLOAD, LEAST_ONE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(1, flowFiles.size());
assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeRecordsOnlyBadData() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
publishMessage(THIS_IS_NOT_JSON, EXACTLY_ONCE);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertEquals(1, badFlowFiles.size());
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true);
MqttClient mqttClient = (MqttClient) f.get(processor);
return mqttClient.isConnected();
}
public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
method.setAccessible(true);
method.invoke(processor, context);
}
@SuppressWarnings("unchecked")
public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true);
return (BlockingQueue<ReceivedMqttMessage>) mqttQueueField.get(consumeMQTT);
}
public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method transferQueue = ConsumeMQTT.class.getDeclaredMethod("transferQueue", ProcessSession.class);
transferQueue.setAccessible(true);
transferQueue.invoke(consumeMQTT, session);
}
private void assertProvenanceEvents(int count){
List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(count, provenanceEvents.size());
if (count > 0) {
assertEquals(ProvenanceEventType.RECEIVE, provenanceEvents.get(0).getEventType());
}
}
private void publishMessage(final String payload, final int qos) {
final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
internalPublish(message, "testTopic");
}
}

View File

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.processors.mqtt.PublishMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.TestRunner;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public abstract class TestPublishMqttCommon {
public TestRunner testRunner;
public String topic;
public abstract void verifyPublishedMessage(byte[] payload, int qos, boolean retain);
@Test
public void testQoS0() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "0");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
testRunner.assertTransferCount(REL_SUCCESS, 1);
assertProvenanceEvents();
verifyPublishedMessage(testMessage.getBytes(), 0, false);
}
@Test
public void testQoS1() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "1");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 1, false);
}
@Test
public void testQoS2NotCleanSession() {
// Publisher executes synchronously so the only time whether its Clean or Not matters is when the processor stops in the middle of the publishing
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE);
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testQoS2() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, false);
}
@Test
public void testRetainQoS2() {
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.setProperty(PublishMQTT.PROP_RETAIN, "true");
testRunner.assertValid();
String testMessage = "testMessage";
testRunner.enqueue(testMessage.getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvents();
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testMessage.getBytes(), 2, true);
}
private void assertProvenanceEvents(){
List<ProvenanceEventRecord> provenanceEvents = testRunner.getProvenanceEvents();
assertNotNull(provenanceEvents);
assertEquals(1, provenanceEvents.size());
assertEquals(ProvenanceEventType.SEND, provenanceEvents.get(0).getEventType());
}
}