NIFI-908 Added support for SSL in JMS connections.

- Added SSL context to JMS producer and consumer processors
 - Tony Kurc Amended patch to check SSL need by scheme and exception consistency
Reviewed by Tony Kurc (tkurc@apache.org)
This commit is contained in:
Luke Williamson 2015-10-26 00:29:05 -04:00 committed by Tony Kurc
parent 8d2f9bc64b
commit 26edab3185
4 changed files with 86 additions and 5 deletions

View File

@ -25,6 +25,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_T
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
@ -87,6 +88,7 @@ public abstract class JmsConsumer extends AbstractProcessor {
descriptors.add(BATCH_SIZE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(ACKNOWLEDGEMENT_MODE);
descriptors.add(MESSAGE_SELECTOR);
descriptors.add(JMS_PROPS_TO_ATTRIBUTES);

View File

@ -47,6 +47,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.REPLY_TO_QU
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import java.io.IOException;
import java.io.InputStream;
@ -122,6 +123,7 @@ public class PutJMS extends AbstractProcessor {
descriptors.add(BATCH_SIZE);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(MESSAGE_TYPE);
descriptors.add(MESSAGE_PRIORITY);
descriptors.add(REPLY_TO_QUEUE);

View File

@ -28,12 +28,15 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUB
import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER;
import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR;
import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD;
import static org.apache.nifi.processors.standard.util.JmsProperties.SSL_CONTEXT_SERVICE;
import static org.apache.nifi.processors.standard.util.JmsProperties.TIMEOUT;
import static org.apache.nifi.processors.standard.util.JmsProperties.URL;
import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
@ -57,12 +60,15 @@ import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.processor.ProcessContext;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSslConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
public class JmsFactory {
@ -348,10 +354,43 @@ public class JmsFactory {
}
private static ConnectionFactory createConnectionFactory(final ProcessContext context) throws JMSException {
final String url = context.getProperty(URL).getValue();
final URI uri;
try {
uri = new URI(context.getProperty(URL).getValue());
} catch (URISyntaxException e) {
// Should not happen - URL was validated
throw new IllegalArgumentException("Validated URI [" + context.getProperty(URL) + "] was invalid", e);
}
final int timeoutMillis = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final String provider = context.getProperty(JMS_PROVIDER).getValue();
return createConnectionFactory(url, timeoutMillis, provider);
if (uri.getScheme().equals("ssl") || (URISupport.isCompositeURI(uri) && compositeURIHasSSL(uri))) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService == null) {
throw new IllegalArgumentException("Attempting to initiate SSL JMS connection and SSL Context is not set.");
}
return createSslConnectionFactory(uri, timeoutMillis, provider, sslContextService.getKeyStoreFile(),
sslContextService.getKeyStorePassword(), sslContextService.getTrustStoreFile(), sslContextService.getTrustStorePassword());
} else {
return createConnectionFactory(uri, timeoutMillis, provider);
}
}
private static boolean compositeURIHasSSL(URI uri) {
try {
CompositeData compositeData = URISupport.parseComposite(uri);
for(URI component : compositeData.getComponents()){
if(component.getScheme().equals("ssl")){
return true;
}
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Attempting to initiate JMS with invalid composite URI [" + uri + "]", e);
}
return false;
}
public static ConnectionFactory createConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider) throws JMSException {
return createConnectionFactory(uri.toString(), timeoutMillis, jmsProvider);
}
public static ConnectionFactory createConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider) throws JMSException {
@ -366,6 +405,36 @@ public class JmsFactory {
}
}
public static ConnectionFactory createSslConnectionFactory(final URI uri, final int timeoutMillis, final String jmsProvider,
final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
return createSslConnectionFactory(uri.toString(), timeoutMillis, jmsProvider, keystore, keystorePassword, truststore, truststorePassword);
}
public static ConnectionFactory createSslConnectionFactory(final String url, final int timeoutMillis, final String jmsProvider,
final String keystore, final String keystorePassword, final String truststore, final String truststorePassword) throws JMSException {
switch (jmsProvider) {
case ACTIVEMQ_PROVIDER: {
final ActiveMQSslConnectionFactory factory = new ActiveMQSslConnectionFactory(url);
try {
factory.setKeyStore(keystore);
} catch (Exception e) {
throw new JMSException("Problem Setting the KeyStore: " + e.getMessage());
}
factory.setKeyStorePassword(keystorePassword);
try {
factory.setTrustStore(truststore);
} catch (Exception e) {
throw new JMSException("Problem Setting the TrustStore: " + e.getMessage());
}
factory.setTrustStorePassword(truststorePassword);
factory.setSendTimeout(timeoutMillis);
return factory;
}
default:
throw new IllegalArgumentException("Unknown JMS Provider: " + jmsProvider);
}
}
public static Map<String, String> createAttributeMap(final Message message) throws JMSException {
final Map<String, String> attributes = new HashMap<>();

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.standard.util;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
public class JmsProperties {
@ -177,4 +178,11 @@ public class JmsProperties {
.defaultValue("1 MB")
.build();
// JMS SSL Properties
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
}