NIFI-7890 - Added record support to ConsumeMQTT processor

This closes #4738.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Pierre Villard 2020-10-07 19:05:47 +02:00 committed by Peter Turcsanyi
parent 953327cdf5
commit 93a5823f8a
4 changed files with 578 additions and 13 deletions

View File

@ -68,9 +68,25 @@
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId> <artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.13.0-SNAPSHOT</version> <version>1.13.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>

View File

@ -33,6 +33,7 @@ import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -45,13 +46,29 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor; import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage; import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -62,11 +79,13 @@ import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_DUPLICATE_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.IS_RETAINED_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.QOS_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.RECORD_COUNT_KEY;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY; import static org.apache.nifi.processors.mqtt.ConsumeMQTT.TOPIC_ATTRIBUTE_KEY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1; import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
@ -79,6 +98,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker") @CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@SeeAlso({PublishMQTT.class}) @SeeAlso({PublishMQTT.class})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute=RECORD_COUNT_KEY, description="The number of records received"),
@WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"), @WritesAttribute(attribute=BROKER_ATTRIBUTE_KEY, description="MQTT broker that was the message source"),
@WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"), @WritesAttribute(attribute=TOPIC_ATTRIBUTE_KEY, description="MQTT topic on which message was received"),
@WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."), @WritesAttribute(attribute=QOS_ATTRIBUTE_KEY, description="The quality of service for this message."),
@ -88,13 +108,25 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single " @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The 'Max Queue Size' specifies the maximum number of messages that can be hold in memory by NiFi by a single "
+ "instance of this processor. A high value for this property could represent a lot of data being stored in memory.") + "instance of this processor. A high value for this property could represent a lot of data being stored in memory.")
public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback { public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker"; public final static String RECORD_COUNT_KEY = "record.count";
public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic"; public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos"; public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate"; public final static String QOS_ATTRIBUTE_KEY = "mqtt.qos";
public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained"; public final static String IS_DUPLICATE_ATTRIBUTE_KEY = "mqtt.isDuplicate";
public final static String IS_RETAINED_ATTRIBUTE_KEY = "mqtt.isRetained";
public final static String TOPIC_FIELD_KEY = "_topic";
public final static String QOS_FIELD_KEY = "_qos";
public final static String IS_DUPLICATE_FIELD_KEY = "_isDuplicate";
public final static String IS_RETAINED_FIELD_KEY = "_isRetained";
private final static String COUNTER_PARSE_FAILURES = "Parse Failures";
private final static String COUNTER_RECORDS_RECEIVED = "Records Received";
private final static String COUNTER_RECORDS_PROCESSED = "Records Processed";
private final static int MAX_MESSAGES_PER_FLOW_FILE = 10000;
public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder() public static final PropertyDescriptor PROP_GROUPID = new PropertyDescriptor.Builder()
.name("Group ID") .name("Group ID")
@ -131,6 +163,46 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for received messages")
.identifiesControllerService(RecordReaderFactory.class)
.required(false)
.build();
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before writing it to a FlowFile")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(false)
.build();
public static final PropertyDescriptor ADD_ATTRIBUTES_AS_FIELDS = new PropertyDescriptor.Builder()
.name("add-attributes-as-fields")
.displayName("Add attributes as fields")
.description("If using the Records reader/writer and if setting this property to true, default fields "
+ "are going to be added in each record: _topic, _qos, _isDuplicate, _isRetained.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator")
.displayName("Message Demarcator")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.description("With this property, you have an option to output FlowFiles which contains multiple messages. "
+ "This property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart "
+ "multiple messages. This is an optional property ; if not provided, and if not defining a "
+ "Reader/Writer, each message received will result in a single FlowFile which. To enter special "
+ "character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS.")
.build();
private volatile int qos; private volatile int qos;
private volatile String topicPrefix = ""; private volatile String topicPrefix = "";
private volatile String topicFilter; private volatile String topicFilter;
@ -143,6 +215,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
.description("The MQTT message output") .description("The MQTT message output")
.build(); .build();
public static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message cannot be parsed using the configured Record Reader, the contents of the "
+ "message will be routed to this Relationship as its own individual FlowFile.")
.autoTerminateDefault(true) // to make sure flow are still valid after upgrades
.build();
private static final List<PropertyDescriptor> descriptors; private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships; private static final Set<Relationship> relationships;
@ -152,10 +231,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
innerDescriptorsList.add(PROP_TOPIC_FILTER); innerDescriptorsList.add(PROP_TOPIC_FILTER);
innerDescriptorsList.add(PROP_QOS); innerDescriptorsList.add(PROP_QOS);
innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE); innerDescriptorsList.add(PROP_MAX_QUEUE_SIZE);
innerDescriptorsList.add(RECORD_READER);
innerDescriptorsList.add(RECORD_WRITER);
innerDescriptorsList.add(ADD_ATTRIBUTES_AS_FIELDS);
innerDescriptorsList.add(MESSAGE_DEMARCATOR);
descriptors = Collections.unmodifiableList(innerDescriptorsList); descriptors = Collections.unmodifiableList(innerDescriptorsList);
final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>(); final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
innerRelationshipsSet.add(REL_MESSAGE); innerRelationshipsSet.add(REL_MESSAGE);
innerRelationshipsSet.add(REL_PARSE_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet); relationships = Collections.unmodifiableSet(innerRelationshipsSet);
} }
@ -200,6 +284,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet(); final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
final boolean clientIDwithEL = context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent(); final boolean clientIDwithEL = context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent();
final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet(); final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
if (!clientIDwithEL && clientIDSet && groupIDSet) { if (!clientIDwithEL && clientIDSet && groupIDSet) {
results.add(new ValidationResult.Builder() results.add(new ValidationResult.Builder()
.subject("Client ID and Group ID").valid(false) .subject("Client ID and Group ID").valid(false)
@ -209,6 +294,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
.build()); .build());
} }
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("Both Record Reader and Writer must be set when used").build());
}
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
if(readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
.explanation("You cannot use both a demarcator and a Reader/Writer").build());
}
return results; return results;
} }
@ -257,7 +355,13 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) { if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
logger.info("Finishing processing leftover messages"); logger.info("Finishing processing leftover messages");
ProcessSession session = processSessionFactory.createSession(); ProcessSession session = processSessionFactory.createSession();
transferQueue(session); if(context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session);
} else {
transferQueue(session);
}
} else { } else {
if (mqttQueue!= null && !mqttQueue.isEmpty()){ if (mqttQueue!= null && !mqttQueue.isEmpty()){
throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " + throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " +
@ -279,10 +383,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
} }
if (mqttQueue.isEmpty()) { if (mqttQueue.isEmpty()) {
context.yield();
return; return;
} }
transferQueue(session); if(context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session);
} else {
transferQueue(session);
}
} }
private void initializeClient(ProcessContext context) { private void initializeClient(ProcessContext context) {
@ -308,8 +419,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
private void transferQueue(ProcessSession session){ private void transferQueue(ProcessSession session){
while (!mqttQueue.isEmpty()) { while (!mqttQueue.isEmpty()) {
FlowFile messageFlowfile = session.create();
final MQTTQueueMessage mqttMessage = mqttQueue.peek(); final MQTTQueueMessage mqttMessage = mqttQueue.peek();
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>(); Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, broker); attrs.put(BROKER_ATTRIBUTE_KEY, broker);
@ -323,18 +434,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() { messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
out.write(mqttMessage.getPayload()); out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
} }
}); });
String transitUri = new StringBuilder(broker).append(mqttMessage.getTopic()).toString(); session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.getProvenanceReporter().receive(messageFlowfile, transitUri);
session.transfer(messageFlowfile, REL_MESSAGE); session.transfer(messageFlowfile, REL_MESSAGE);
session.commit(); session.commit();
if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) { if (!mqttQueue.remove(mqttMessage) && logger.isWarnEnabled()) {
logger.warn(new StringBuilder("FlowFile ") logger.warn(new StringBuilder("FlowFile ")
.append(messageFlowfile.getAttribute(CoreAttributes.UUID.key())) .append(messageFlowfile.getAttribute(CoreAttributes.UUID.key()))
.append(" for Mqtt message ") .append(" for MQTT message ")
.append(mqttMessage) .append(mqttMessage)
.append(" had already been removed from queue, possible duplication of flow files") .append(" had already been removed from queue, possible duplication of flow files")
.toString()); .toString());
@ -342,6 +452,217 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
} }
} }
private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
FlowFile messageFlowfile = session.create();
session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
messageFlowfile = session.append(messageFlowfile, out -> {
int i = 0;
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
final MQTTQueueMessage mqttMessage = mqttQueue.poll();
out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
out.write(demarcator);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
i++;
}
});
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(topicPrefix, topicFilter));
session.transfer(messageFlowfile, REL_MESSAGE);
session.commit();
}
private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>();
attrs.put(BROKER_ATTRIBUTE_KEY, broker);
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(mqttMessage.getPayload());
}
});
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_PARSE_FAILURE);
session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
}
private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
FlowFile flowFile = session.create();
session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
RecordSetWriter writer = null;
boolean isWriterInitialized = false;
int i = 0;
try {
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
final MQTTQueueMessage mqttMessage = mqttQueue.poll();
if(mqttMessage == null) {
break;
}
final byte[] recordBytes = mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload();
try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader;
try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final Exception e) {
logger.error("Failed to parse the message from the internal queue, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
continue;
}
try {
Record record;
while ((record = reader.nextRecord()) != null) {
if(!isWriterInitialized) {
final RecordSchema recordSchema = record.getSchema();
final OutputStream rawOut = session.write(flowFile);
RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
final List<RecordField> fields = new ArrayList<>();
fields.addAll(writeSchema.getFields());
fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
fields.add(new RecordField(IS_DUPLICATE_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
fields.add(new RecordField(IS_RETAINED_FIELD_KEY, RecordFieldType.BOOLEAN.getDataType()));
writeSchema = new SimpleRecordSchema(fields);
}
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
continue;
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
}
try {
if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
record.setValue(IS_DUPLICATE_FIELD_KEY, mqttMessage.isDuplicate());
}
writer.write(record);
isWriterInitialized = true;
doneList.add(mqttMessage);
} catch (final RuntimeException re) {
logger.error("Failed to write message using the configured Record Writer, sending to the parse failure relationship", re);
transferFailure(session, mqttMessage);
continue;
}
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
i++;
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
continue;
}
} catch (Exception e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
continue;
}
}
if(writer != null) {
final WriteResult writeResult = writer.finishRecordSet();
attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.putAll(writeResult.getAttributes());
recordCount.set(writeResult.getRecordCount());
}
} catch (final Exception e) {
context.yield();
// we try to add the messages back into the internal queue
int numberOfMessages = 0;
for(MQTTQueueMessage done : doneList) {
try {
mqttQueue.offer(done, 1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
numberOfMessages++;
if(getLogger().isDebugEnabled()) {
logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
}
}
}
if(numberOfMessages > 0) {
logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
}
throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
} finally {
closeWriter(writer);
}
if(recordCount.get() == 0) {
session.remove(flowFile);
return;
}
session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile, getTransitUri(topicPrefix, topicFilter));
session.transfer(flowFile, REL_MESSAGE);
session.commit();
final int count = recordCount.get();
session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile});
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
writer.close();
}
} catch (final Exception ioe) {
logger.warn("Failed to close Record Writer", ioe);
}
}
private String getTransitUri(String... appends) {
StringBuilder stringBuilder = new StringBuilder(brokerUri);
for(String append : appends) {
stringBuilder.append(append);
}
return stringBuilder.toString();
}
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause); logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);

View File

@ -62,6 +62,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
protected ComponentLog logger; protected ComponentLog logger;
protected IMqttClient mqttClient; protected IMqttClient mqttClient;
protected volatile String broker; protected volatile String broker;
protected volatile String brokerUri;
protected volatile String clientID; protected volatile String clientID;
protected MqttConnectOptions connOpts; protected MqttConnectOptions connOpts;
protected MemoryPersistence persistence = new MemoryPersistence(); protected MemoryPersistence persistence = new MemoryPersistence();
@ -314,6 +315,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
protected void onScheduled(final ProcessContext context){ protected void onScheduled(final ProcessContext context){
broker = context.getProperty(PROP_BROKER_URI).getValue(); broker = context.getProperty(PROP_BROKER_URI).getValue();
brokerUri = broker.endsWith("/") ? broker : broker + "/";
clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue(); clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
if (clientID == null) { if (clientID == null) {

View File

@ -20,11 +20,15 @@ package org.apache.nifi.processors.mqtt.common;
import io.moquette.proto.messages.AbstractMessage; import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage; import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server; import io.moquette.server.Server;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.ConsumeMQTT; import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -53,6 +57,7 @@ import static org.junit.Assert.assertTrue;
public abstract class TestConsumeMqttCommon { public abstract class TestConsumeMqttCommon {
public int PUBLISH_WAIT_MS = 1000; public int PUBLISH_WAIT_MS = 1000;
public static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
public Server MQTT_server; public Server MQTT_server;
public TestRunner testRunner; public TestRunner testRunner;
@ -409,6 +414,227 @@ public abstract class TestConsumeMqttCommon {
flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false"); flowFile.assertAttributeEquals(IS_RETAINED_ATTRIBUTE_KEY, "false");
} }
@Test
public void testConsumeRecordsWithAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() == 1);
assertEquals("[{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false},"
+ "{\"name\":\"Apache NiFi\",\"_topic\":\"testTopic\",\"_qos\":0,\"_isDuplicate\":false,\"_isRetained\":false}]",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeDemarcator() throws Exception {
testRunner.setProperty(ConsumeMQTT.MESSAGE_DEMARCATOR, "\\n");
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertEquals(flowFiles.size(), 1);
assertEquals("{\"name\":\"Apache NiFi\"}\\n"
+ THIS_IS_NOT_JSON + "\\n"
+ "{\"name\":\"Apache NiFi\"}\\n",
new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 0);
// clean runner by removing message demarcator
testRunner.removeProperty(ConsumeMQTT.MESSAGE_DEMARCATOR);
}
@Test
public void testConsumeRecordsWithoutAddedFields() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage testMessage = new PublishMessage();
testMessage.setPayload(ByteBuffer.wrap("{\"name\":\"Apache NiFi\"}".getBytes()));
testMessage.setTopicName("testTopic");
testMessage.setDupFlag(false);
testMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
testMessage.setRetainFlag(false);
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(testMessage);
internalPublish(badMessage);
internalPublish(testMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_MESSAGE);
assertTrue(flowFiles.size() == 1);
assertEquals("[{\"name\":\"Apache NiFi\"},{\"name\":\"Apache NiFi\"}]", new String(flowFiles.get(0).toByteArray()));
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
@Test
public void testConsumeRecordsOnlyBadData() throws Exception {
testRunner.setProperty(ConsumeMQTT.RECORD_READER, "record-reader");
testRunner.setProperty(ConsumeMQTT.RECORD_WRITER, "record-writer");
testRunner.setProperty(ConsumeMQTT.ADD_ATTRIBUTES_AS_FIELDS, "false");
final JsonTreeReader jsonReader = new JsonTreeReader();
testRunner.addControllerService("record-reader", jsonReader);
testRunner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "infer-schema");
testRunner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
testRunner.addControllerService("record-writer", jsonWriter);
testRunner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
testRunner.enableControllerService(jsonWriter);
testRunner.assertValid();
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
assertTrue(isConnected(consumeMQTT));
PublishMessage badMessage = new PublishMessage();
badMessage.setPayload(ByteBuffer.wrap(THIS_IS_NOT_JSON.getBytes()));
badMessage.setTopicName("testTopic");
badMessage.setDupFlag(false);
badMessage.setQos(AbstractMessage.QOSType.MOST_ONE);
badMessage.setRetainFlag(false);
internalPublish(badMessage);
Thread.sleep(PUBLISH_WAIT_MS);
testRunner.run(1, false, false);
List<MockFlowFile> badFlowFiles = testRunner.getFlowFilesForRelationship(ConsumeMQTT.REL_PARSE_FAILURE);
assertTrue(badFlowFiles.size() == 1);
assertEquals(THIS_IS_NOT_JSON, new String(badFlowFiles.get(0).toByteArray()));
// clean runner by removing records reader/writer
testRunner.removeProperty(ConsumeMQTT.RECORD_READER);
testRunner.removeProperty(ConsumeMQTT.RECORD_WRITER);
}
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException { private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient"); Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true); f.setAccessible(true);