From 26edab3185008f1d34647ff3c11ba8b87815de02 Mon Sep 17 00:00:00 2001 From: Luke Williamson Date: Mon, 26 Oct 2015 00:29:05 -0400 Subject: [PATCH] NIFI-908 Added support for SSL in JMS connections. - Added SSL context to JMS producer and consumer processors - Tony Kurc Amended patch to check SSL need by scheme and exception consistency Reviewed by Tony Kurc (tkurc@apache.org) --- .../nifi/processors/standard/JmsConsumer.java | 2 + .../nifi/processors/standard/PutJMS.java | 2 + .../processors/standard/util/JmsFactory.java | 79 +++++++++++++++++-- .../standard/util/JmsProperties.java | 8 ++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java index b53d62f240..461d3816b9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java @@ -25,6 +25,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_T import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; +import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE; import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; import static org.apache.nifi.processors.standard.util.JmsProperties.URL; import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; @@ -87,6 +88,7 @@ public abstract class JmsConsumer extends AbstractProcessor { descriptors.add(BATCH_SIZE); descriptors.add(USERNAME); descriptors.add(PASSWORD); + descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(ACKNOWLEDGEMENT_MODE); descriptors.add(MESSAGE_SELECTOR); descriptors.add(JMS_PROPS_TO_ATTRIBUTES); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java index dff5a6b600..b8902a91ac 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutJMS.java @@ -47,6 +47,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QU import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; import static org.apache.nifi.processors.standard.util.JmsProperties.URL; import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; +import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE; import java.io.IOException; import java.io.InputStream; @@ -122,6 +123,7 @@ public class PutJMS extends AbstractProcessor { descriptors.add(BATCH_SIZE); descriptors.add(USERNAME); descriptors.add(PASSWORD); + descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(MESSAGE_TYPE); descriptors.add(MESSAGE_PRIORITY); descriptors.add(REPLY_TO_QUEUE); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java index 35a65dcd78..ca5df9f4eb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java @@ -28,12 +28,15 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUB import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; +import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE; import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT; import static org.apache.nifi.processors.standard.util.JmsProperties.URL; import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; import java.io.IOException; import java.io.ObjectOutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; @@ -57,12 +60,15 @@ import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.processor.ProcessContext; - import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSslConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.util.URISupport.CompositeData; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.stream.io.ByteArrayOutputStream; public class JmsFactory { @@ -348,10 +354,43 @@ public class JmsFactory { } private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException { - final String url = context.getProperty(URL).getValue(); + final URI uri; + try { + uri = new URI(context.getProperty(URL).getValue()); + } catch (URISyntaxException e) { + // Should not happen - URL was validated + throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e); + } final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); final String provider = context.getProperty(JMS_PROVIDER).getValue(); - return createConnectionFactory(url, timeoutMillis, provider); + if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService == null) { + throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set."); + } + return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(), + sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword()); + } else { + return createConnectionFactory(uri, timeoutMillis, provider); + } + } + + private static boolean compositeURIHasSSL(URI uri) { + try { + CompositeData compositeData = URISupport.parseComposite(uri); + for(URI component : compositeData.getComponents()){ + if(component.getScheme().equals("ssl")){ + return true; + } + } + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e); + } + return false; + } + + public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException { + return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider); } public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException { @@ -366,6 +405,36 @@ public class JmsFactory { } } + public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider, + final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException { + return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword); + } + + public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider, + final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException { + switch (jmsProvider) { + case ACTIVEMQ_PROVIDER: { + final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url); + try { + factory.setKeyStore(keystore); + } catch (Exception e) { + throw new JMSException("Problem Setting the KeyStore: " + e.getMessage()); + } + factory.setKeyStorePassword(keystorePassword); + try { + factory.setTrustStore(truststore); + } catch (Exception e) { + throw new JMSException("Problem Setting the TrustStore: " + e.getMessage()); + } + factory.setTrustStorePassword(truststorePassword); + factory.setSendTimeout(timeoutMillis); + return factory; + } + default: + throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider); + } + } + public static Map createAttributeMap(final Message message) throws JMSException { final Map attributes = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java index ed73569362..f538624243 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsProperties.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; public class JmsProperties { @@ -177,4 +178,11 @@ public class JmsProperties { .defaultValue("1 MB") .build(); + // JMS SSL Properties + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); }