NIFI-7654: Deprecated Client Auth property on AMQP processors

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4412.
This commit is contained in:
Peter Turcsanyi 2020-07-16 00:32:38 +02:00 committed by Pierre Villard
parent f6c2824f74
commit 21c3085d15
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
4 changed files with 125 additions and 89 deletions

View File

@ -20,14 +20,16 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultSaslConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -74,16 +76,14 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
.name("User Name")
.description("User Name used for authentication and authorization.")
.required(true)
.defaultValue("guest")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
.description("Password used for authentication and authorization.")
.required(true)
.defaultValue("guest")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
@ -103,8 +103,8 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.build();
public static final PropertyDescriptor USE_CERT_AUTHENTICATION = new PropertyDescriptor.Builder()
.name("cert-authentication")
.displayName("Use Certificate Authentication")
.description("Authenticate using the SSL certificate common name rather than user name/password.")
.displayName("Use Client Certificate Authentication")
.description("Authenticate using the SSL certificate rather than user name/password.")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
@ -113,12 +113,10 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("ssl-client-auth")
.displayName("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.description("The property has no effect and therefore deprecated.")
.required(false)
.allowableValues(SslContextFactory.ClientAuth.values())
.defaultValue("REQUIRED")
.defaultValue("NONE")
.build();
private static final List<PropertyDescriptor> propertyDescriptors;
@ -144,6 +142,46 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>();
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
boolean userConfigured = context.getProperty(USER).isSet();
boolean passwordConfigured = context.getProperty(PASSWORD).isSet();
boolean sslServiceConfigured = context.getProperty(SSL_CONTEXT_SERVICE).isSet();
boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean();
if (useCertAuthentication && (userConfigured || passwordConfigured)) {
results.add(new ValidationResult.Builder()
.subject("Authentication configuration")
.valid(false)
.explanation(String.format("'%s' with '%s' and '%s' cannot be configured at the same time",
USER.getDisplayName(), PASSWORD.getDisplayName(),
USE_CERT_AUTHENTICATION.getDisplayName()))
.build());
}
if (!useCertAuthentication && (!userConfigured || !passwordConfigured)) {
results.add(new ValidationResult.Builder()
.subject("Authentication configuration")
.valid(false)
.explanation(String.format("either '%s' with '%s' or '%s' must be configured",
USER.getDisplayName(), PASSWORD.getDisplayName(),
USE_CERT_AUTHENTICATION.getDisplayName()))
.build());
}
if (useCertAuthentication && !sslServiceConfigured) {
results.add(new ValidationResult.Builder()
.subject("SSL configuration")
.valid(false)
.explanation(String.format("'%s' has been set but no '%s' configured",
USE_CERT_AUTHENTICATION.getDisplayName(), SSL_CONTEXT_SERVICE.getDisplayName()))
.build());
}
return results;
}
/**
* Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
* implementation of {@link #processResource} method for further processing.
@ -216,32 +254,15 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
}
// handles TLS/SSL aspects
final Boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean();
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
// if the property to use cert authentication is set but the SSL service hasn't been configured, throw an exception.
if (useCertAuthentication && sslService == null) {
throw new IllegalStateException("This processor is configured to use cert authentication, " +
"but the SSL Context Service hasn't been configured. You need to configure the SSL Context Service.");
}
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final Boolean useCertAuthentication = context.getProperty(USE_CERT_AUTHENTICATION).asBoolean();
if (sslService != null) {
final SslContextFactory.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SslContextFactory.ClientAuth.REQUIRED;
} else {
try {
clientAuth = SslContextFactory.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
throw new IllegalStateException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
final SSLContext sslContext = sslService.createSSLContext(clientAuth);
final SSLContext sslContext = sslService.createSSLContext(SslContextFactory.ClientAuth.NONE);
cf.useSslProtocol(sslContext);
if (useCertAuthentication) {
// this tells the factory to use the cert common name for authentication and not user name and password
// this tells the factory to use the client certificate for authentication and not user name and password
// REF: https://github.com/rabbitmq/rabbitmq-auth-mechanism-ssl
cf.setSaslConfig(DefaultSaslConfig.EXTERNAL);
}

View File

@ -16,10 +16,7 @@
*/
package org.apache.nifi.amqp.processors;
import com.rabbitmq.client.Connection;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -35,55 +32,68 @@ import static org.mockito.Mockito.when;
*/
public class AbstractAMQPProcessorTest {
MockAbstractAMQPProcessor processor;
private TestRunner testRunner;
@Before
public void setUp() throws Exception {
processor = new MockAbstractAMQPProcessor();
testRunner = TestRunners.newTestRunner(processor);
public void setUp() {
testRunner = TestRunners.newTestRunner(ConsumeAMQP.class);
testRunner.setProperty(ConsumeAMQP.QUEUE, "queue");
}
@Test(expected = IllegalStateException.class)
public void testConnectToCassandraWithSSLBadClientAuth() throws Exception {
@Test
public void testValidUserPassword() {
testRunner.setProperty(AbstractAMQPProcessor.USER, "user");
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password");
testRunner.assertValid();
}
@Test
public void testNotValidUserMissing() {
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password");
testRunner.assertNotValid();
}
@Test
public void testNotValidPasswordMissing() {
testRunner.setProperty(AbstractAMQPProcessor.USER, "user");
testRunner.assertNotValid();
}
@Test
public void testNotValidBothUserPasswordAndClientCertAuth() throws Exception {
testRunner.setProperty(AbstractAMQPProcessor.USER, "user");
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "password");
testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true");
configureSSLContextService();
testRunner.assertNotValid();
}
@Test
public void testValidClientCertAuth() throws Exception {
testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true");
configureSSLContextService();
testRunner.assertValid();
}
@Test
public void testNotValidClientCertAuthButNoSSLContextService() throws Exception {
testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true");
testRunner.assertNotValid();
}
private void configureSSLContextService() throws InitializationException {
SSLContextService sslService = mock(SSLContextService.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
testRunner.addControllerService("ssl-context", sslService);
testRunner.enableControllerService(sslService);
testRunner.setProperty(AbstractAMQPProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "false");
testRunner.setProperty(AbstractAMQPProcessor.HOST, "test");
testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999");
testRunner.setProperty(AbstractAMQPProcessor.USER, "test");
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test");
testRunner.assertValid(sslService);
testRunner.setProperty(AbstractAMQPProcessor.CLIENT_AUTH, "BAD");
processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory());
}
@Test(expected = IllegalStateException.class)
public void testInvalidSSLConfiguration() throws Exception {
// it's invalid to have use_cert_auth enabled and not have the SSL Context Service configured
testRunner.setProperty(AbstractAMQPProcessor.USE_CERT_AUTHENTICATION, "true");
testRunner.setProperty(AbstractAMQPProcessor.HOST, "test");
testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999");
testRunner.setProperty(AbstractAMQPProcessor.USER, "test");
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test");
processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory());
}
/**
* Provides a stubbed processor instance for testing
*/
public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor<AMQPConsumer> {
@Override
protected void processResource(Connection connection, AMQPConsumer consumer, ProcessContext context, ProcessSession session) throws ProcessException {
// nothing to do
}
@Override
protected AMQPConsumer createAMQPWorker(ProcessContext context, Connection connection) {
return null;
}
}
}

View File

@ -55,9 +55,7 @@ public class ConsumeAMQPTest {
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE, "false");
runner.run();
@ -88,9 +86,7 @@ public class ConsumeAMQPTest {
sender.publish("world".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
runner.run(2);
@ -122,9 +118,7 @@ public class ConsumeAMQPTest {
sender.publish("good-bye".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
LocalConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
TestRunner runner = initTestRunner(proc);
runner.setProperty(ConsumeAMQP.BATCH_SIZE, "1");
runner.run();
@ -150,7 +144,7 @@ public class ConsumeAMQPTest {
}
@Test
public void validateSuccessfullConsumeAndTransferToSuccess() throws Exception {
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
@ -160,9 +154,7 @@ public class ConsumeAMQPTest {
sender.publish("hello".getBytes(), MessageProperties.PERSISTENT_TEXT_PLAIN, "key1", "myExchange");
ConsumeAMQP proc = new LocalConsumeAMQP(connection);
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
TestRunner runner = initTestRunner(proc);
runner.run();
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishAMQP.REL_SUCCESS).get(0);
@ -170,6 +162,15 @@ public class ConsumeAMQPTest {
}
}
private TestRunner initTestRunner(ConsumeAMQP proc) {
TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(ConsumeAMQP.HOST, "injvm");
runner.setProperty(ConsumeAMQP.QUEUE, "queue1");
runner.setProperty(ConsumeAMQP.USER, "user");
runner.setProperty(ConsumeAMQP.PASSWORD, "password");
return runner;
}
public static class LocalConsumeAMQP extends ConsumeAMQP {
private final Connection connection;
private AMQPConsumer consumer;

View File

@ -41,12 +41,14 @@ import com.rabbitmq.client.GetResponse;
public class PublishAMQPTest {
@Test
public void validateSuccessfullPublishAndTransferToSuccess() throws Exception {
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
final PublishAMQP pubProc = new LocalPublishAMQP();
final TestRunner runner = TestRunners.newTestRunner(pubProc);
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.EXCHANGE, "myExchange");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
final Map<String, String> attributes = new HashMap<>();
attributes.put("foo", "bar");
@ -110,6 +112,8 @@ public class PublishAMQPTest {
runner.setProperty(PublishAMQP.HOST, "injvm");
runner.setProperty(PublishAMQP.EXCHANGE, "badToTheBone");
runner.setProperty(PublishAMQP.ROUTING_KEY, "key1");
runner.setProperty(PublishAMQP.USER, "user");
runner.setProperty(PublishAMQP.PASSWORD, "password");
runner.enqueue("Hello Joe".getBytes());