NIFI-9410: Fix for ConsumeMQTT processor in stateless environment

NIFI-9410: Added displayName to the QoS processor property

This closes #5549.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Peter Gyori 2021-11-23 17:04:00 +01:00 committed by Peter Turcsanyi
parent fea6a7f6db
commit 2dcff09f7f
1 changed files with 5 additions and 3 deletions

View File

@ -145,7 +145,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor.Builder()
.name("Quality of Service(QoS)")
.description("The Quality of Service(QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
.displayName("Quality of Service (QoS)")
.description("The Quality of Service (QoS) to receive the message with. Accepts values '0', '1' or '2'; '0' for 'at most once', '1' for 'at least once', '2' for 'exactly once'.")
.required(true)
.defaultValue(ALLOWABLE_VALUE_QOS_0.getValue())
.allowableValues(
@ -387,7 +388,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
return;
}
if(context.getProperty(RECORD_READER).isSet()) {
if (context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session);
@ -440,7 +441,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_MESSAGE);
session.commitAsync(() -> mqttQueue.remove(mqttMessage));
session.commitAsync();
mqttQueue.remove(mqttMessage);
}
}