diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
index 491e0d809d..6cd602484b 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-nar/pom.xml
@@ -28,6 +28,11 @@
true
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ nar
+
org.apache.nifi
nifi-amqp-processors
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
index 5652bd48d8..1aba5fd0ca 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/pom.xml
@@ -33,7 +33,10 @@
org.apache.nifi
nifi-api
-
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+
org.apache.nifi
nifi-processor-utils
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index e572870721..af24cb7cd7 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -20,7 +20,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -28,6 +32,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
@@ -84,6 +90,23 @@ abstract class AbstractAMQPProcessor extends AbstractProce
.allowableValues("0.9.1")
.defaultValue("0.9.1")
.build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl-context-service")
+ .displayName("SSL Context Service")
+ .description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+ public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
+ .name("ssl-client-auth")
+ .displayName("Client Auth")
+ .description("Client authentication policy when connecting to secure (TLS/SSL) AMQP broker. "
+ + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context "
+ + "has been defined and enabled.")
+ .required(false)
+ .allowableValues(SSLContextService.ClientAuth.values())
+ .defaultValue("REQUIRED")
+ .build();
static List descriptors = new ArrayList<>();
@@ -98,6 +121,8 @@ abstract class AbstractAMQPProcessor extends AbstractProce
descriptors.add(USER);
descriptors.add(PASSWORD);
descriptors.add(AMQP_VERSION);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
}
protected volatile Connection amqpConnection;
@@ -192,6 +217,33 @@ abstract class AbstractAMQPProcessor extends AbstractProce
cf.setVirtualHost(vHost);
}
+ // handles TLS/SSL aspects
+ final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue();
+ final SSLContext sslContext;
+
+ if (sslService != null) {
+ final SSLContextService.ClientAuth clientAuth;
+ if (StringUtils.isBlank(rawClientAuth)) {
+ clientAuth = SSLContextService.ClientAuth.REQUIRED;
+ } else {
+ try {
+ clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth);
+ } catch (final IllegalArgumentException iae) {
+ throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]",
+ rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", ")));
+ }
+ }
+ sslContext = sslService.createSSLContext(clientAuth);
+ } else {
+ sslContext = null;
+ }
+
+ // check if the ssl context is set and add it to the factory if so
+ if (sslContext != null) {
+ cf.useSslProtocol(sslContext);
+ }
+
try {
Connection connection = cf.newConnection();
return connection;
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
new file mode 100644
index 0000000000..be709b3eed
--- /dev/null
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessorTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for the AbstractAMQPProcessor class
+ */
+public class AbstractAMQPProcessorTest {
+
+ MockAbstractAMQPProcessor processor;
+ private TestRunner testRunner;
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new MockAbstractAMQPProcessor();
+ testRunner = TestRunners.newTestRunner(processor);
+ }
+
+ @Test(expected = ProviderCreationException.class)
+ public void testConnectToCassandraWithSSLBadClientAuth() throws Exception {
+ SSLContextService sslService = mock(SSLContextService.class);
+ when(sslService.getIdentifier()).thenReturn("ssl-context");
+ testRunner.addControllerService("ssl-context", sslService);
+ testRunner.enableControllerService(sslService);
+ testRunner.setProperty(AbstractAMQPProcessor.SSL_CONTEXT_SERVICE, "ssl-context");
+ testRunner.setProperty(AbstractAMQPProcessor.HOST, "test");
+ testRunner.setProperty(AbstractAMQPProcessor.PORT, "9999");
+ testRunner.setProperty(AbstractAMQPProcessor.USER, "test");
+ testRunner.setProperty(AbstractAMQPProcessor.PASSWORD, "test");
+ testRunner.assertValid(sslService);
+ testRunner.setProperty(AbstractAMQPProcessor.CLIENT_AUTH, "BAD");
+ processor.onTrigger(testRunner.getProcessContext(), testRunner.getProcessSessionFactory());
+ }
+
+ /**
+ * Provides a stubbed processor instance for testing
+ */
+ public static class MockAbstractAMQPProcessor extends AbstractAMQPProcessor {
+ @Override
+ protected void rendezvousWithAmqp(ProcessContext context, ProcessSession session) throws ProcessException {
+ // nothing to do
+ }
+ @Override
+ protected AMQPConsumer finishBuildingTargetResource(ProcessContext context) {
+ return null;
+ }
+ }
+}