NIFI-1521 Added SSL configuration to AMQP processor.

fixed build failure (+5 squashed commits)
Squashed commits:
[a3405f8] NIFI-1521 fixed build failure
[bf91743] NIFI-1521 fixed name/displayName in properties
[a44beaa] NIFI-1521 Added unit test
[c523689] NIFI-1521 Added client auth property and reverted modification on SSL context service
[75f3457] NIFI-1521 Allows use of SSL in AMQP Processor

This closes #232.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Pierre Villard 2016-02-17 16:30:09 +01:00 committed by Andy LoPresto
parent f94b0f2ed2
commit b50cf7d4d4
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
4 changed files with 136 additions and 1 deletions

View File

@ -28,6 +28,11 @@
<source.skip>true</source.skip> <source.skip>true</source.skip>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-amqp-processors</artifactId> <artifactId>nifi-amqp-processors</artifactId>

View File

@ -33,7 +33,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId> <artifactId>nifi-processor-utils</artifactId>

View File

@ -20,7 +20,11 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -28,6 +32,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConnectionFactory;
@ -84,6 +90,23 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.allowableValues("0.9.1") .allowableValues("0.9.1")
.defaultValue("0.9.1") .defaultValue("0.9.1")
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("ssl-client-auth")
.displayName("Client Auth")
.description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. "
+ "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ "has been defined and enabled.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue("REQUIRED")
.build();
static List<PropertyDescriptor> descriptors = new ArrayList<>(); static List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -98,6 +121,8 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
descriptors.add(USER); descriptors.add(USER);
descriptors.add(PASSWORD); descriptors.add(PASSWORD);
descriptors.add(AMQP_VERSION); descriptors.add(AMQP_VERSION);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
} }
protected volatile Connection amqpConnection; protected volatile Connection amqpConnection;
@ -192,6 +217,33 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
cf.setVirtualHost(vHost); cf.setVirtualHost(vHost);
} }
// handles TLS/SSL aspects
final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
final SSLContext sslContext;
if (sslService != null) {
final SSLContextService.ClientAuth clientAuth;
if (StringUtils.isBlank(rawClientAuth)) {
clientAuth = SSLContextService.ClientAuth.REQUIRED;
} else {
try {
clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
} catch (final IllegalArgumentException iae) {
throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
}
}
sslContext = sslService.createSSLContext(clientAuth);
} else {
sslContext = null;
}
// check if the ssl context is set and add it to the factory if so
if (sslContext != null) {
cf.useSslProtocol(sslContext);
}
try { try {
Connection connection = cf.newConnection(); Connection connection = cf.newConnection();
return connection; return connection;

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.amqp.processors;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for the AbstractAMQPProcessor class
*/
public class AbstractAMQPProcessorTest {
MockAbstractAMQPProcessor processor;
private TestRunner testRunner;
@Before
public void setUp() throws Exception {
processor = new MockAbstractAMQPProcessor();
testRunner = TestRunners.newTestRunner(processor);
}
@Test(expected = ProviderCreationException.class)
public void testConnectToCassandraWithSSLBadClientAuth() throws Exception {
SSLContextService sslService = mock(SSLContextService.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
testRunner.addControllerService("ssl-context", sslService);
testRunner.enableControllerService(sslService);
testRunner.setProperty(AbstractAMQPProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
testRunner.setProperty(AbstractAMQPProcessor.HOST, "test");
testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999");
testRunner.setProperty(AbstractAMQPProcessor.USER, "test");
testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test");
testRunner.assertValid(sslService);
testRunner.setProperty(AbstractAMQPProcessor.CLIENT_AUTH, "BAD");
processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory());
}
/**
* Provides a stubbed processor instance for testing
*/
public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor<AMQPConsumer> {
@Override
protected void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException {
// nothing to do
}
@Override
protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
return null;
}
}
}