NIFI-8341 Support Multi Hosts in AMQP Processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4939.
This commit is contained in:
nabmoh123 2021-03-24 16:16:50 +00:00 committed by Pierre Villard
parent 76f33e42c6
commit d0045c13b1
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 33 additions and 10 deletions

View File

@ -16,10 +16,12 @@
*/
package org.apache.nifi.amqp.processors;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -55,18 +57,26 @@ import org.apache.nifi.ssl.SSLContextService;
*/
abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProcessor {
public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder()
.name("Brokers")
.description("A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is " +
"set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder()
.name("Host Name")
.description("Network address of AMQP broker (e.g., localhost)")
.required(true)
.description("Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.")
.required(false)
.defaultValue("localhost")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
.required(true)
.description("Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.")
.required(false)
.defaultValue("5672")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
@ -128,6 +138,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(BROKERS);
properties.add(HOST);
properties.add(PORT);
properties.add(V_HOST);
@ -280,11 +291,13 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
}
}
private Address[] createHostsList(final ProcessContext context) {
String evaluatedUrls = context.getProperty(BROKERS).evaluateAttributeExpressions().getValue();
return Address.parseAddresses(evaluatedUrls);
}
protected Connection createConnection(ProcessContext context, ExecutorService executor) {
final ConnectionFactory cf = new ConnectionFactory();
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
@ -317,7 +330,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
});
try {
Connection connection = cf.newConnection(executor);
Connection connection;
if (context.getProperty(BROKERS).isSet()) {
Address[] hostsList = createHostsList(context);
connection = cf.newConnection(executor, hostsList);
} else {
cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
connection = cf.newConnection(executor);
}
return connection;
} catch (Exception e) {
throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e);

View File

@ -39,6 +39,7 @@ public class AbstractAMQPProcessorTest {
testRunner = TestRunners.newTestRunner(ConsumeAMQP.class);
testRunner.setProperty(ConsumeAMQP.QUEUE, "queue");
testRunner.setProperty(AbstractAMQPProcessor.BROKERS, "localhost:5672");
}
@Test

View File

@ -163,7 +163,7 @@ public class ConsumeAMQPTest {
private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.setProperty(ConsumeAMQP.USER, "user");
runner.setProperty(ConsumeAMQP.PASSWORD, "password");

View File

@ -45,7 +45,7 @@ public class PublishAMQPTest {
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
@ -110,7 +110,7 @@ public class PublishAMQPTest {
public void validateFailedPublishAndTransferToFailure() throws Exception {
PublishAMQP pubProc = new LocalPublishAMQP();
TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.BROKERS, "injvm:5672");
runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");