NIFI-7894 - ConsumeMQTT - allow EL on Client ID property with shared subscription

This closes #4586.

Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
Pierre Villard 2020-10-08 11:32:06 +02:00 committed by Peter Turcsanyi
parent c9d778a8ee
commit 80eb570ee1
2 changed files with 30 additions and 2 deletions

View File

@ -198,9 +198,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
}
final boolean clientIDSet = context.getProperty(PROP_CLIENTID).isSet();
final boolean clientIDwithEL = context.getProperty(PROP_CLIENTID).isExpressionLanguagePresent();
final boolean groupIDSet = context.getProperty(PROP_GROUPID).isSet();
if (clientIDSet && groupIDSet) {
results.add(new ValidationResult.Builder().subject("Client ID and Group ID").valid(false).explanation("if client ID is not unique, multiple nodes cannot join the consumer group").build());
if (!clientIDwithEL && clientIDSet && groupIDSet) {
results.add(new ValidationResult.Builder()
.subject("Client ID and Group ID").valid(false)
.explanation("if client ID is not unique, multiple nodes cannot join the consumer group (if you want "
+ "to set the client ID, please use expression language to make sure each node in the NiFi "
+ "cluster gets a unique client ID with something like ${hostname()}).")
.build());
}
return results;
@ -230,6 +236,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback
if (context.getProperty(PROP_GROUPID).isSet()) {
topicPrefix = "$share/" + context.getProperty(PROP_GROUPID).getValue() + "/";
} else {
topicPrefix = "";
}
scheduled.set(true);

View File

@ -27,6 +27,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.Test;
@ -59,6 +60,25 @@ public abstract class TestConsumeMqttCommon {
public abstract void internalPublish(PublishMessage publishMessage);
@Test
public void testClientIDConfiguration() {
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "tcp://localhost:1883");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");
runner.assertValid();
runner.setProperty(ConsumeMQTT.PROP_GROUPID, "group");
runner.assertNotValid();
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "${hostname()}");
runner.assertValid();
runner.removeProperty(ConsumeMQTT.PROP_CLIENTID);
runner.assertValid();
}
@Test
public void testLastWillConfig() throws Exception {
testRunner.setProperty(ConsumeMQTT.PROP_LAST_WILL_MESSAGE, "lastWill message");