NIFI-10644 Add Message Demarcator-style processing in PublishMQTT

This closes #6530.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-10-14 00:40:02 +02:00 committed by Peter Turcsanyi
parent eecb6bfb38
commit 298dd2024e
No known key found for this signature in database
GPG Key ID: 55A813F1C3E553DC
5 changed files with 338 additions and 78 deletions

View File

@ -33,7 +33,6 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -172,6 +171,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.description("The Record Writer to use for serializing Records before writing them to a FlowFile.")
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
.description("With this property, you have an option to output FlowFiles which contains multiple messages. "
+ "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ "multiple messages. This is an optional property ; if not provided, and if not defining a "
+ "Record Reader/Writer, each message received will result in a single FlowFile. To enter special "
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
@ -184,19 +192,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.dependsOn(RECORD_READER)
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator")
.displayName("Message Demarcator")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.description("With this property, you have an option to output FlowFiles which contains multiple messages. "
+ "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ "multiple messages. This is an optional property ; if not provided, and if not defining a "
+ "Reader/Writer, each message received will result in a single FlowFile which. To enter special "
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
private volatile int qos;
private volatile String topicPrefix = "";
private volatile String topicFilter;
@ -296,13 +291,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
.build());
}
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
}
return results;
}
@ -345,7 +333,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
}
@OnStopped
public void onStopped(final ProcessContext context) {
if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {

View File

@ -54,10 +54,14 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -109,6 +113,15 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
.description("The Record Writer to use for serializing Records before publishing them as an MQTT Message.")
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BASE_MESSAGE_DEMARCATOR)
.description("With this property, you have an option to publish multiple messages from a single FlowFile. "
+ "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ "the FlowFile content. This is an optional property ; if not provided, and if not defining a "
+ "Record Reader/Writer, each FlowFile will be published as a single message. To enter special "
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles that are sent successfully to the destination are transferred to this relationship.")
@ -132,6 +145,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
PROP_QOS,
RECORD_READER,
RECORD_WRITER,
MESSAGE_DEMARCATOR,
PROP_CONN_TIMEOUT,
PROP_KEEP_ALIVE_INTERVAL,
PROP_LAST_WILL_MESSAGE,
@ -145,10 +159,6 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
REL_FAILURE
)));
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
static final String ATTR_PUBLISH_FAILED_INDEX_SUFFIX = ".mqtt.publish.failed.index";
private String publishFailedIndexAttributeName;
@ -205,62 +215,31 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
if (context.getProperty(RECORD_READER).isSet()) {
processRecordSet(context, session, flowfile, topic);
processMultiMessageFlowFile(new ProcessRecordSetStrategy(), context, session, flowfile, topic);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
processMultiMessageFlowFile(new ProcessDemarcatedContentStrategy(), context, session, flowfile, topic);
} else {
processStandardFlowFile(context, session, flowfile, topic);
}
}
private void processRecordSet(ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
private void processMultiMessageFlowFile(ProcessStrategy processStrategy, ProcessContext context, ProcessSession session, final FlowFile flowfile, String topic) {
final StopWatch stopWatch = new StopWatch(true);
final AtomicInteger processedRecords = new AtomicInteger();
try {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final Long previousProcessFailedAt = ofNullable(flowfile.getAttribute(publishFailedIndexAttributeName)).map(Long::valueOf).orElse(null);
session.read(flowfile, in -> {
try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
while ((record = recordSet.next()) != null) {
if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
processedRecords.getAndIncrement();
continue;
}
baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
writer.write(record);
writer.flush();
}
final byte[] messageContent = baos.toByteArray();
publishMessage(context, flowfile, topic, messageContent);
processedRecords.getAndIncrement();
}
} catch (SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("An error happened during creating components for serialization.", e);
}
});
session.read(flowfile, in -> processStrategy.process(context, flowfile, in, topic, processedRecords, previousProcessFailedAt));
FlowFile successFlowFile = flowfile;
String provenanceEventDetails;
if (previousProcessFailedAt != null) {
successFlowFile = session.removeAttribute(flowfile, publishFailedIndexAttributeName);
provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER, processedRecords.get());
provenanceEventDetails = String.format(processStrategy.getRecoverTemplateMessage(), processedRecords.get());
} else {
provenanceEventDetails = String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS, processedRecords.get());
provenanceEventDetails = String.format(processStrategy.getSuccessTemplateMessage(), processedRecords.get());
}
session.getProvenanceReporter().send(flowfile, clientProperties.getRawBrokerUris(), provenanceEventDetails, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
@ -274,7 +253,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
session.getProvenanceReporter().send(
failedFlowFile,
clientProperties.getRawBrokerUris(),
String.format(PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE, processedRecords.get()),
String.format(processStrategy.getFailureTemplateMessage(), processedRecords.get()),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
}
@ -336,4 +315,110 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
}
interface ProcessStrategy {
void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException;
String getFailureTemplateMessage();
String getRecoverTemplateMessage();
String getSuccessTemplateMessage();
}
class ProcessRecordSetStrategy implements ProcessStrategy {
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE = "Publish failed after %d successfully published records.";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER = "Successfully finished publishing previously failed records. Total record count: %d";
static final String PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS = "Successfully published all records. Total record count: %d";
@Override
public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) throws IOException {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
try (final RecordReader reader = readerFactory.createRecordReader(flowfile, in, logger)) {
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowfile.getAttributes(), recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
while ((record = recordSet.next()) != null) {
if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
processedRecords.getAndIncrement();
continue;
}
baos.reset();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowfile)) {
writer.write(record);
writer.flush();
}
final byte[] messageContent = baos.toByteArray();
publishMessage(context, flowfile, topic, messageContent);
processedRecords.getAndIncrement();
}
} catch (SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("An error happened during creating components for serialization.", e);
}
}
@Override
public String getFailureTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
}
@Override
public String getRecoverTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
}
@Override
public String getSuccessTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
}
}
class ProcessDemarcatedContentStrategy implements ProcessStrategy {
static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE = "Publish failed after %d successfully published messages.";
static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER = "Successfully finished publishing previously failed messages. Total message count: %d";
static final String PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS = "Successfully published all messages. Total message count: %d";
@Override
public void process(ProcessContext context, FlowFile flowfile, InputStream in, String topic, AtomicInteger processedRecords, Long previousProcessFailedAt) {
final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue();
try (final Scanner scanner = new Scanner(in)) {
scanner.useDelimiter(demarcator);
while (scanner.hasNext()) {
final String messageContent = scanner.next();
if (previousProcessFailedAt != null && processedRecords.get() < previousProcessFailedAt) {
processedRecords.getAndIncrement();
continue;
}
publishMessage(context, flowfile, topic, messageContent.getBytes(StandardCharsets.UTF_8));
processedRecords.getAndIncrement();
}
}
}
@Override
public String getFailureTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
}
@Override
public String getRecoverTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
}
@Override
public String getSuccessTemplateMessage() {
return PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
}
}
}

View File

@ -240,6 +240,14 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.required(false)
.build();
public static final PropertyDescriptor BASE_MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator")
.displayName("Message Demarcator")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@Override
public Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
@ -291,6 +299,12 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.explanation("both properties must be set when used.").build());
}
final boolean demarcatorIsSet = validationContext.getProperty(BASE_MESSAGE_DEMARCATOR).isSet();
if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
}
return results;
}

View File

@ -156,6 +156,18 @@ public class TestConsumeMQTT {
testRunner.assertValid();
}
@Test
public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\n");
testRunner.assertNotValid();
}
@Test
public void testQoS2() throws Exception {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);

View File

@ -40,9 +40,12 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
import static org.apache.nifi.processors.mqtt.PublishMQTT.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessDemarcatedContentStrategy.PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_RECOVER;
import static org.apache.nifi.processors.mqtt.PublishMQTT.ProcessRecordSetStrategy.PROVENANCE_EVENT_DETAILS_ON_RECORDSET_SUCCESS;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_FAILURE;
import static org.apache.nifi.processors.mqtt.PublishMQTT.REL_SUCCESS;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
@ -70,6 +73,18 @@ public class TestPublishMQTT {
mqttTestClient = null;
}
@Test
public void testRecordAndDemarcatorConfigurationTogetherIsInvalid() throws InitializationException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, "\n");
testRunner.assertNotValid();
}
@Test
public void testQoS0() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
@ -180,12 +195,12 @@ public class TestPublishMQTT {
}
@Test
public void testPublishRecordSet() throws InitializationException {
public void testPublishRecords() throws InitializationException {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@ -213,15 +228,15 @@ public class TestPublishMQTT {
}
@Test
public void testPublishRecordSetFailed() throws InitializationException {
public void testPublishRecordsFailed() throws InitializationException {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@ -254,8 +269,8 @@ public class TestPublishMQTT {
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@ -288,8 +303,8 @@ public class TestPublishMQTT {
Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
testRunner.setProperty(ConsumeMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_READER, createJsonRecordSetReaderService(testRunner));
testRunner.setProperty(PublishMQTT.RECORD_WRITER, createJsonRecordSetWriterService(testRunner));
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
@ -318,6 +333,149 @@ public class TestPublishMQTT {
publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
}
@Test
public void testPublishDemarcatedContent() {
mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
testRunner = initializeTestRunner(mqttTestClient);
final String demarcator = "\n";
testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final List<String> testInput = createMultipleInput();
testRunner.enqueue(String.join(demarcator, testInput).getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_SUCCESS, 3));
testRunner.assertTransferCount(REL_SUCCESS, 1);
verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
assertFalse(successfulFlowFile.getAttributes().containsKey(publishFailedIndexAttributeName), "Failed attribute should not be present on the FlowFile");
}
@Test
public void testPublishDemarcatedContentFailed() {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
final String demarcator = "\n";
testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final List<String> testInput = createMultipleInput();
testRunner.enqueue(String.join(demarcator, testInput).getBytes());
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 1));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(0).getBytes(), 2, false);
verifyNoMorePublished();
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
assertEquals("1", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishDemarcatedContentAndFailAgainWhenPreviousPublishFailed() {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod()
.doThrow(new RuntimeException("Second publish failed."))
.when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
final String demarcator = "\n";
testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
final List<String> testInput = createMultipleInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_FAILURE, 2));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_FAILURE);
assertEquals(1, flowFiles.size());
final MockFlowFile failedFlowFile = flowFiles.get(0);
assertEquals("2", failedFlowFile.getAttribute(publishFailedIndexAttributeName), "Only one record is expected to be published successfully.");
}
@Test
public void testContinuePublishDemarcatedContentSuccessfullyWhenPreviousPublishFailed() {
mqttTestClient = Mockito.spy(new MqttTestClient(MqttTestClient.ConnectType.Publisher));
Mockito.doCallRealMethod().when(mqttTestClient).publish(any(), any());
testRunner = initializeTestRunner(mqttTestClient);
final String demarcator = "\n";
testRunner.setProperty(PublishMQTT.MESSAGE_DEMARCATOR, demarcator);
testRunner.setProperty(PublishMQTT.PROP_QOS, "2");
testRunner.assertValid();
final String publishFailedIndexAttributeName = testRunner.getProcessor().getIdentifier() + ATTR_PUBLISH_FAILED_INDEX_SUFFIX;
final List<String> testInput = createMultipleInput();
final Map<String, String> attributes = new HashMap<>();
attributes.put(publishFailedIndexAttributeName, "1");
testRunner.enqueue(String.join(demarcator, testInput).getBytes(), attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
assertProvenanceEvent(String.format(PROVENANCE_EVENT_DETAILS_ON_DEMARCATED_MESSAGE_RECOVER, 3));
verify(mqttTestClient, Mockito.times(2)).publish(any(), any());
verifyPublishedMessage(testInput.get(1).getBytes(), 2, false);
verifyPublishedMessage(testInput.get(2).getBytes(), 2, false);
verifyNoMorePublished();
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile successfulFlowFile = flowFiles.get(0);
assertNull(successfulFlowFile.getAttribute(publishFailedIndexAttributeName),
publishFailedIndexAttributeName + " is expected to be removed after all remaining records have been published successfully.");
}
private void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
final Pair<String, StandardMqttMessage> lastPublished = mqttTestClient.getLastPublished();
final String lastPublishedTopic = lastPublished.getLeft();
@ -367,6 +525,10 @@ public class TestPublishMQTT {
));
}
private static List<String> createMultipleInput() {
return Arrays.asList("message1", "message2", "message3");
}
private TestRunner initializeTestRunner(MqttTestClient mqttTestClient) {
final TestRunner testRunner = TestRunners.newTestRunner(new PublishMQTT() {
@Override