diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index a5f63d57d3..0a0418ac55 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -16,10 +16,12 @@ */ package org.apache.nifi.amqp.processors; +import com.rabbitmq.client.Address; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultSaslConfig; import com.rabbitmq.client.impl.DefaultExceptionHandler; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -55,18 +57,26 @@ import org.apache.nifi.ssl.SSLContextService; */ abstract class AbstractAMQPProcessor extends AbstractProcessor { + public static final PropertyDescriptor BROKERS = new PropertyDescriptor.Builder() + .name("Brokers") + .description("A comma-separated list of known AMQP Brokers in the format : (e.g., localhost:5672). If this is " + + "set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .build(); public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() .name("Host Name") - .description("Network address of AMQP broker (e.g., localhost)") - .required(true) + .description("Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.") + .required(false) .defaultValue("localhost") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() .name("Port") - .description("Numeric value identifying Port of AMQP broker (e.g., 5671)") - .required(true) + .description("Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.") + .required(false) .defaultValue("5672") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.PORT_VALIDATOR) @@ -128,6 +138,7 @@ abstract class AbstractAMQPProcessor extends AbstractProce static { final List properties = new ArrayList<>(); + properties.add(BROKERS); properties.add(HOST); properties.add(PORT); properties.add(V_HOST); @@ -280,11 +291,13 @@ abstract class AbstractAMQPProcessor extends AbstractProce } } + private Address[] createHostsList(final ProcessContext context) { + String evaluatedUrls = context.getProperty(BROKERS).evaluateAttributeExpressions().getValue(); + return Address.parseAddresses(evaluatedUrls); + } protected Connection createConnection(ProcessContext context, ExecutorService executor) { final ConnectionFactory cf = new ConnectionFactory(); - cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue()); - cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue())); cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue()); cf.setPassword(context.getProperty(PASSWORD).getValue()); @@ -317,7 +330,16 @@ abstract class AbstractAMQPProcessor extends AbstractProce }); try { - Connection connection = cf.newConnection(executor); + Connection connection; + if (context.getProperty(BROKERS).isSet()) { + Address[] hostsList = createHostsList(context); + connection = cf.newConnection(executor, hostsList); + } else { + cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue()); + cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue())); + connection = cf.newConnection(executor); + } + return connection; } catch (Exception e) { throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java index 19dd1ea6dc..2273b56a15 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java @@ -39,6 +39,7 @@ public class AbstractAMQPProcessorTest { testRunner = TestRunners.newTestRunner(ConsumeAMQP.class); testRunner.setProperty(ConsumeAMQP.QUEUE, "queue"); + testRunner.setProperty(AbstractAMQPProcessor.BROKERS, "localhost:5672"); } @Test diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 2debebe25d..125593dbfe 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -163,7 +163,7 @@ public class ConsumeAMQPTest { private TestRunner initTestRunner(ConsumeAMQP proc) { TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.BROKERS, "injvm:5672"); runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); runner.setProperty(ConsumeAMQP.USER, "user"); runner.setProperty(ConsumeAMQP.PASSWORD, "password"); diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index 556d7b99f2..345372a927 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -45,7 +45,7 @@ public class PublishAMQPTest { public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { final PublishAMQP pubProc = new LocalPublishAMQP(); final TestRunner runner = TestRunners.newTestRunner(pubProc); - runner.setProperty(PublishAMQP.HOST, "injvm"); + runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); runner.setProperty(PublishAMQP.USER, "user"); @@ -110,7 +110,7 @@ public class PublishAMQPTest { public void validateFailedPublishAndTransferToFailure() throws Exception { PublishAMQP pubProc = new LocalPublishAMQP(); TestRunner runner = TestRunners.newTestRunner(pubProc); - runner.setProperty(PublishAMQP.HOST, "injvm"); + runner.setProperty(PublishAMQP.BROKERS, "injvm:5672"); runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone"); runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); runner.setProperty(PublishAMQP.USER, "user");