NIFI-12576 MQTT processors allow path component in Broker URI

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8213.
This commit is contained in:
Peter Turcsanyi 2024-01-07 19:11:16 +01:00 committed by Pierre Villard
parent d9cd62a217
commit f55093d103
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
2 changed files with 9 additions and 6 deletions

View File

@ -48,7 +48,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
@ -264,10 +263,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
boolean sameSchemeValidationErrorAdded = false;
boolean sslValidationErrorAdded = false;
for(URI brokerUri : brokerUris) {
if (!EMPTY.equals(brokerUri.getPath())) {
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false)
.explanation("the broker URI cannot have a path. It currently is: " + brokerUri.getPath()).build());
}
final String scheme = brokerUri.getScheme();
if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, scheme)) {
results.add(new ValidationResult.Builder().subject(PROP_BROKER_URI.getName()).valid(false)

View File

@ -68,7 +68,9 @@ public class TestConsumeMQTT {
private static final int PUBLISH_WAIT_MS = 0;
private static final String THIS_IS_NOT_JSON = "ThisIsNotAJSON";
private static final String BROKER_URI = "tcp://localhost:1883";
private static final String SSL_BROKER_URI = "ssl://localhost:1883";
private static final String SSL_BROKER_URI = "ssl://localhost:8883";
private static final String WS_BROKER_URI = "ws://localhost:15675/ws";
private static final String WSS_BROKER_URI = "wss://localhost:15676/ws";
private static final String CLUSTERED_BROKER_URI = "tcp://localhost:1883,tcp://localhost:1884";
private static final String SSL_CLUSTERED_BROKER_URI = "ssl://localhost:1883,ssl://localhost:1884";
private static final String INVALID_BROKER_URI = "http://localhost:1883";
@ -135,6 +137,9 @@ public class TestConsumeMQTT {
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, CLUSTERED_BROKER_URI);
testRunner.assertValid();
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, WS_BROKER_URI);
testRunner.assertValid();
}
@Test
@ -154,6 +159,9 @@ public class TestConsumeMQTT {
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, SSL_BROKER_URI);
testRunner.assertValid();
testRunner.setProperty(ConsumeMQTT.PROP_BROKER_URI, WSS_BROKER_URI);
testRunner.assertValid();
}
@Test