diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java index 4a2e49a7b8..3344dc9b3f 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java @@ -20,14 +20,16 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultSaslConfig; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import javax.net.ssl.SSLContext; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -74,16 +76,14 @@ abstract class AbstractAMQPProcessor extends AbstractProce public static final PropertyDescriptor USER = new PropertyDescriptor.Builder() .name("User Name") .description("User Name used for authentication and authorization.") - .required(true) - .defaultValue("guest") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() .name("Password") .description("Password used for authentication and authorization.") - .required(true) - .defaultValue("guest") + .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .sensitive(true) .build(); @@ -103,8 +103,8 @@ abstract class AbstractAMQPProcessor extends AbstractProce .build(); public static final PropertyDescriptor USE_CERT_AUTHENTICATION = new PropertyDescriptor.Builder() .name("cert-authentication") - .displayName("Use Certificate Authentication") - .description("Authenticate using the SSL certificate common name rather than user name/password.") + .displayName("Use Client Certificate Authentication") + .description("Authenticate using the SSL certificate rather than user name/password.") .required(false) .defaultValue("false") .allowableValues("true", "false") @@ -113,12 +113,10 @@ abstract class AbstractAMQPProcessor extends AbstractProce public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("ssl-client-auth") .displayName("Client Auth") - .description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. " - + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " - + "has been defined and enabled.") + .description("The property has no effect and therefore deprecated.") .required(false) .allowableValues(SslContextFactory.ClientAuth.values()) - .defaultValue("REQUIRED") + .defaultValue("NONE") .build(); private static final List propertyDescriptors; @@ -144,6 +142,46 @@ abstract class AbstractAMQPProcessor extends AbstractProce private final BlockingQueue> resourceQueue = new LinkedBlockingQueue<>(); + @Override + protected Collection customValidate(ValidationContext context) { + List results = new ArrayList<>(super.customValidate(context)); + + boolean userConfigured = context.getProperty(USER).isSet(); + boolean passwordConfigured = context.getProperty(PASSWORD).isSet(); + boolean sslServiceConfigured = context.getProperty(SSL_CONTEXT_SERVICE).isSet(); + boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean(); + + if (useCertAuthentication && (userConfigured || passwordConfigured)) { + results.add(new ValidationResult.Builder() + .subject("Authentication configuration") + .valid(false) + .explanation(String.format("'%s' with '%s' and '%s' cannot be configured at the same time", + USER.getDisplayName(), PASSWORD.getDisplayName(), + USE_CERT_AUTHENTICATION.getDisplayName())) + .build()); + } + + if (!useCertAuthentication && (!userConfigured || !passwordConfigured)) { + results.add(new ValidationResult.Builder() + .subject("Authentication configuration") + .valid(false) + .explanation(String.format("either '%s' with '%s' or '%s' must be configured", + USER.getDisplayName(), PASSWORD.getDisplayName(), + USE_CERT_AUTHENTICATION.getDisplayName())) + .build()); + } + + if (useCertAuthentication && !sslServiceConfigured) { + results.add(new ValidationResult.Builder() + .subject("SSL configuration") + .valid(false) + .explanation(String.format("'%s' has been set but no '%s' configured", + USE_CERT_AUTHENTICATION.getDisplayName(), SSL_CONTEXT_SERVICE.getDisplayName())) + .build()); + } + return results; + } + /** * Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the * implementation of {@link #processResource} method for further processing. @@ -216,32 +254,15 @@ abstract class AbstractAMQPProcessor extends AbstractProce } // handles TLS/SSL aspects - final Boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean(); final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - // if the property to use cert authentication is set but the SSL service hasn't been configured, throw an exception. - if (useCertAuthentication && sslService == null) { - throw new IllegalStateException("This processor is configured to use cert authentication, " + - "but the SSL Context Service hasn't been configured. You need to configure the SSL Context Service."); - } - final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final Boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean(); if (sslService != null) { - final SslContextFactory.ClientAuth clientAuth; - if (StringUtils.isBlank(rawClientAuth)) { - clientAuth = SslContextFactory.ClientAuth.REQUIRED; - } else { - try { - clientAuth = SslContextFactory.ClientAuth.valueOf(rawClientAuth); - } catch (final IllegalArgumentException iae) { - throw new IllegalStateException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", - rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); - } - } - final SSLContext sslContext = sslService.createSSLContext(clientAuth); + final SSLContext sslContext = sslService.createSSLContext(SslContextFactory.ClientAuth.NONE); cf.useSslProtocol(sslContext); if (useCertAuthentication) { - // this tells the factory to use the cert common name for authentication and not user name and password + // this tells the factory to use the client certificate for authentication and not user name and password // REF: https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl cf.setSaslConfig(DefaultSaslConfig.EXTERNAL); } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java index 8cbb8a3d06..19dd1ea6dc 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java @@ -16,10 +16,7 @@ */ package org.apache.nifi.amqp.processors; -import com.rabbitmq.client.Connection; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -35,55 +32,68 @@ import static org.mockito.Mockito.when; */ public class AbstractAMQPProcessorTest { - MockAbstractAMQPProcessor processor; private TestRunner testRunner; @Before - public void setUp() throws Exception { - processor = new MockAbstractAMQPProcessor(); - testRunner = TestRunners.newTestRunner(processor); + public void setUp() { + testRunner = TestRunners.newTestRunner(ConsumeAMQP.class); + + testRunner.setProperty(ConsumeAMQP.QUEUE, "queue"); } - @Test(expected = IllegalStateException.class) - public void testConnectToCassandraWithSSLBadClientAuth() throws Exception { + @Test + public void testValidUserPassword() { + testRunner.setProperty(AbstractAMQPProcessor.USER, "user"); + testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password"); + + testRunner.assertValid(); + } + + @Test + public void testNotValidUserMissing() { + testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password"); + + testRunner.assertNotValid(); + } + + @Test + public void testNotValidPasswordMissing() { + testRunner.setProperty(AbstractAMQPProcessor.USER, "user"); + + testRunner.assertNotValid(); + } + + @Test + public void testNotValidBothUserPasswordAndClientCertAuth() throws Exception { + testRunner.setProperty(AbstractAMQPProcessor.USER, "user"); + testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password"); + testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true"); + configureSSLContextService(); + + testRunner.assertNotValid(); + } + + @Test + public void testValidClientCertAuth() throws Exception { + testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true"); + configureSSLContextService(); + + testRunner.assertValid(); + } + + @Test + public void testNotValidClientCertAuthButNoSSLContextService() throws Exception { + testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true"); + + testRunner.assertNotValid(); + } + + private void configureSSLContextService() throws InitializationException { SSLContextService sslService = mock(SSLContextService.class); when(sslService.getIdentifier()).thenReturn("ssl-context"); testRunner.addControllerService("ssl-context", sslService); testRunner.enableControllerService(sslService); + testRunner.setProperty(AbstractAMQPProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); - testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "false"); - testRunner.setProperty(AbstractAMQPProcessor.HOST, "test"); - testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999"); - testRunner.setProperty(AbstractAMQPProcessor.USER, "test"); - testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test"); - testRunner.assertValid(sslService); - testRunner.setProperty(AbstractAMQPProcessor.CLIENT_AUTH, "BAD"); - processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory()); - } - - @Test(expected = IllegalStateException.class) - public void testInvalidSSLConfiguration() throws Exception { - // it's invalid to have use_cert_auth enabled and not have the SSL Context Service configured - testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true"); - testRunner.setProperty(AbstractAMQPProcessor.HOST, "test"); - testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999"); - testRunner.setProperty(AbstractAMQPProcessor.USER, "test"); - testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test"); - processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory()); - } - - /** - * Provides a stubbed processor instance for testing - */ - public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor { - @Override - protected void processResource(Connection connection, AMQPConsumer consumer, ProcessContext context, ProcessSession session) throws ProcessException { - // nothing to do - } - - @Override - protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) { - return null; - } } } diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java index 6374fa627a..1a7fc0ae59 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java @@ -55,9 +55,7 @@ public class ConsumeAMQPTest { sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ConsumeAMQP.HOST, "injvm"); - runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + TestRunner runner = initTestRunner(proc); runner.setProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE, "false"); runner.run(); @@ -88,9 +86,7 @@ public class ConsumeAMQPTest { sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ConsumeAMQP.HOST, "injvm"); - runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + TestRunner runner = initTestRunner(proc); runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1"); runner.run(2); @@ -122,9 +118,7 @@ public class ConsumeAMQPTest { sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); LocalConsumeAMQP proc = new LocalConsumeAMQP(connection); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ConsumeAMQP.HOST, "injvm"); - runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + TestRunner runner = initTestRunner(proc); runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1"); runner.run(); @@ -150,7 +144,7 @@ public class ConsumeAMQPTest { } @Test - public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception { + public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { final Map> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2")); final Map exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1"); @@ -160,9 +154,7 @@ public class ConsumeAMQPTest { sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange"); ConsumeAMQP proc = new LocalConsumeAMQP(connection); - TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(ConsumeAMQP.HOST, "injvm"); - runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + TestRunner runner = initTestRunner(proc); runner.run(); final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0); @@ -170,6 +162,15 @@ public class ConsumeAMQPTest { } } + private TestRunner initTestRunner(ConsumeAMQP proc) { + TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(ConsumeAMQP.HOST, "injvm"); + runner.setProperty(ConsumeAMQP.QUEUE, "queue1"); + runner.setProperty(ConsumeAMQP.USER, "user"); + runner.setProperty(ConsumeAMQP.PASSWORD, "password"); + return runner; + } + public static class LocalConsumeAMQP extends ConsumeAMQP { private final Connection connection; private AMQPConsumer consumer; diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java index cee44a178e..0464e8eddf 100644 --- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java +++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java @@ -41,12 +41,14 @@ import com.rabbitmq.client.GetResponse; public class PublishAMQPTest { @Test - public void validateSuccessfullPublishAndTransferToSuccess() throws Exception { + public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { final PublishAMQP pubProc = new LocalPublishAMQP(); final TestRunner runner = TestRunners.newTestRunner(pubProc); runner.setProperty(PublishAMQP.HOST, "injvm"); runner.setProperty(PublishAMQP.EXCHANGE, "myExchange"); runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); + runner.setProperty(PublishAMQP.USER, "user"); + runner.setProperty(PublishAMQP.PASSWORD, "password"); final Map attributes = new HashMap<>(); attributes.put("foo", "bar"); @@ -110,6 +112,8 @@ public class PublishAMQPTest { runner.setProperty(PublishAMQP.HOST, "injvm"); runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone"); runner.setProperty(PublishAMQP.ROUTING_KEY, "key1"); + runner.setProperty(PublishAMQP.USER, "user"); + runner.setProperty(PublishAMQP.PASSWORD, "password"); runner.enqueue("Hello Joe".getBytes());