NIFI-7226: Add Connection Factory configuration properties to PublishJMS and ConsumeJMS processors

Some JMS client libraries may not work with the existing controller services due to incompatible
classloader handling between the 3rd party library and NiFi.
Via configuring the Connection Factory on the processor itself, only the processor's and its
children's classloaders will be used which eliminates the mentioned incompatibility.

This closes #4110.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Peter Turcsanyi 2020-03-04 19:11:43 +01:00 committed by Mark Payne
parent 040c8a0af9
commit 7c57e75da4
20 changed files with 1507 additions and 693 deletions

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import javax.jms.ConnectionFactory;
/**
* Defines a strategy to create implementations to load and initialize third
* party implementations of the {@link ConnectionFactory}
*/
public interface IJMSConnectionFactoryProvider {
/**
* Returns an instance of the {@link ConnectionFactory} specific to the
* target messaging system (eg. org.apache.activemq.ActiveMQConnectionFactory).
*
* @return instance of {@link ConnectionFactory}
*/
ConnectionFactory getConnectionFactory();
/**
* Resets {@link ConnectionFactory}.
* Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches
* current {@link ConnectionFactory}.
* @param cachedFactory - {@link ConnectionFactory} cached by client.
*/
void resetConnectionFactory(ConnectionFactory cachedFactory);
}

View File

@ -16,31 +16,10 @@
*/
package org.apache.nifi.jms.cf;
import javax.jms.ConnectionFactory;
import org.apache.nifi.controller.ControllerService;
/**
* Defines a strategy to create implementations to load and initialize third
* party implementations of the {@link ConnectionFactory}
* Base interface of controller service implementations of IJMSConnectionFactoryProvider.
*/
public interface JMSConnectionFactoryProviderDefinition extends ControllerService {
/**
* Returns an instance of the {@link ConnectionFactory} specific to the
* target messaging system (i.e.,
* org.apache.activemq.ActiveMQConnectionFactory).
*
* @return instance of {@link ConnectionFactory}
*/
ConnectionFactory getConnectionFactory();
/**
* Resets {@link ConnectionFactory}.
* Provider should reset {@link ConnectionFactory} only if a copy provided by a client matches
* current {@link ConnectionFactory}.
* @param cachedFactory - {@link ConnectionFactory} cached by client.
*/
void resetConnectionFactory(ConnectionFactory cachedFactory);
public interface JMSConnectionFactoryProviderDefinition extends IJMSConnectionFactoryProvider, ControllerService {
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import javax.jms.ConnectionFactory;
import javax.net.ssl.SSLContext;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.apache.nifi.jms.cf.JMSConnectionFactoryProperties.JMS_BROKER_URI;
import static org.apache.nifi.jms.cf.JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL;
import static org.apache.nifi.jms.cf.JMSConnectionFactoryProperties.JMS_SSL_CONTEXT_SERVICE;
/**
* Handler class to create a JMS Connection Factory by instantiating the vendor specific javax.jms.ConnectionFactory
* implementation class and configuring the Connection Factory object directly.
* The handler can be used from controller services and processors as well.
*/
public class JMSConnectionFactoryHandler implements IJMSConnectionFactoryProvider {
private final PropertyContext context;
private final Set<PropertyDescriptor> propertyDescriptors;
private final ComponentLog logger;
public JMSConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger) {
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
public JMSConnectionFactoryHandler(ProcessContext context, ComponentLog logger) {
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
private volatile ConnectionFactory connectionFactory;
@Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
initConnectionFactory();
} else {
logger.debug("Connection Factory has already been initialized. Will return cached instance.");
}
return connectionFactory;
}
@Override
public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
logger.debug("Resetting connection factory");
connectionFactory = null;
}
}
private void initConnectionFactory() {
try {
if (logger.isInfoEnabled()) {
logger.info("Configuring " + getClass().getSimpleName() + " for '"
+ context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be connected to '"
+ context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue() + "'");
}
createConnectionFactoryInstance();
setConnectionFactoryProperties();
} catch (Exception e) {
connectionFactory = null;
logger.error("Failed to configure " + getClass().getSimpleName(), e);
throw new IllegalStateException(e);
}
}
/**
* Creates an instance of the {@link ConnectionFactory} from the provided
* 'CONNECTION_FACTORY_IMPL'.
*/
private void createConnectionFactoryInstance() {
String connectionFactoryImplName = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName);
}
/**
* This operation follows standard bean convention by matching property name
* to its corresponding 'setter' method. Once the method was located it is
* invoked to set the corresponding property to a value provided by during
* service configuration. For example, 'channel' property will correspond to
* 'setChannel(..) method and 'queueManager' property will correspond to
* setQueueManager(..) method with a single argument. The bean convention is also
* explained in user manual for this component with links pointing to
* documentation of various ConnectionFactories.
* <p>
* There are also few adjustments to accommodate well known brokers. For
* example ActiveMQ ConnectionFactory accepts address of the Message Broker
* in a form of URL while IBMs in the form of host/port pair(s).
* <p>
* This method will use the value retrieved from the 'BROKER_URI' static
* property as is. An exception to this if ConnectionFactory implementation
* is coming from IBM MQ and connecting to a stand-alone queue manager. In
* this case the Broker URI is expected to be entered as a colon separated
* host/port pair, which then is split on ':' and the resulting pair will be
* used to execute setHostName(..) and setPort(..) methods on the provided
* ConnectionFactory.
* <p>
* This method may need to be maintained and adjusted to accommodate other
* implementation of ConnectionFactory, but only for URL/Host/Port issue.
* All other properties are set as dynamic properties where user essentially
* provides both property name and value.
*
* @see <a href="http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html#setBrokerURL-java.lang.String-">setBrokerURL(String brokerURL)</a>
* @see <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html#setServerUrl(java.lang.String)">setServerUrl(String serverUrl)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String hostname)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int port)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String hosts)</a>
* @see #setProperty(String propertyName, Object propertyValue)
*/
void setConnectionFactoryProperties() {
if (context.getProperty(JMS_BROKER_URI).isSet()) {
String brokerValue = context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue();
String connectionFactoryValue = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
if (connectionFactoryValue.startsWith("org.apache.activemq")) {
setProperty("brokerURL", brokerValue);
} else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
setProperty("serverUrl", brokerValue);
} else {
String[] brokerList = brokerValue.split(",");
if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
List<String> ibmConList = new ArrayList<String>();
for (String broker : brokerList) {
String[] hostPort = broker.split(":");
if (hostPort.length == 2) {
ibmConList.add(hostPort[0] + "(" + hostPort[1] + ")");
} else {
ibmConList.add(broker);
}
}
setProperty("connectionNameList", String.join(",", ibmConList));
} else {
// Try to parse broker URI as colon separated host/port pair. Use first pair if multiple given.
String[] hostPort = brokerList[0].split(":");
if (hostPort.length == 2) {
// If broker URI indeed was colon separated host/port pair
setProperty("hostName", hostPort[0]);
setProperty("port", hostPort[1]);
}
}
}
}
SSLContextService sc = context.getProperty(JMS_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sc != null) {
SSLContext ssl = sc.createSSLContext(ClientAuth.NONE);
setProperty("sSLSocketFactory", ssl.getSocketFactory());
}
propertyDescriptors.stream()
.filter(PropertyDescriptor::isDynamic)
.forEach(descriptor -> {
String propertyName = descriptor.getName();
String propertyValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
setProperty(propertyName, propertyValue);
});
}
/**
* Sets corresponding {@link ConnectionFactory}'s property to a
* 'propertyValue' by invoking a 'setter' method that corresponds to
* 'propertyName'. For example, 'channel' property will correspond to
* 'setChannel(..) method and 'queueManager' property will correspond to
* setQueueManager(..) method with a single argument.
* <p>
* NOTE: There is a limited type conversion to accommodate property value
* types since all NiFi configuration properties comes as String. It is
* accomplished by checking the argument type of the method and executing
* its corresponding conversion to target primitive (e.g., value 'true' will
* go thru Boolean.parseBoolean(propertyValue) if method argument is of type
* boolean). None-primitive values are not supported at the moment and will
* result in {@link IllegalArgumentException}. It is OK though since based
* on analysis of several ConnectionFactory implementation the all seem to
* follow bean convention and all their properties using Java primitives as
* arguments.
*/
void setProperty(String propertyName, Object propertyValue) {
String methodName = toMethodName(propertyName);
Method[] methods = Utils.findMethods(methodName, connectionFactory.getClass());
if (methods != null && methods.length > 0) {
try {
for (Method method : methods) {
Class<?> returnType = method.getParameterTypes()[0];
if (String.class.isAssignableFrom(returnType)) {
method.invoke(connectionFactory, propertyValue);
return;
} else if (int.class.isAssignableFrom(returnType)) {
method.invoke(connectionFactory, Integer.parseInt((String) propertyValue));
return;
} else if (long.class.isAssignableFrom(returnType)) {
method.invoke(connectionFactory, Long.parseLong((String) propertyValue));
return;
} else if (boolean.class.isAssignableFrom(returnType)) {
method.invoke(connectionFactory, Boolean.parseBoolean((String) propertyValue));
return;
}
}
methods[0].invoke(connectionFactory, propertyValue);
} catch (Exception e) {
throw new IllegalStateException("Failed to set property " + propertyName, e);
}
} else if (propertyName.equals("hostName")) {
setProperty("host", propertyValue); // try 'host' as another common convention.
}
}
/**
* Will convert propertyName to a method name following bean convention. For
* example, 'channel' property will correspond to 'setChannel method and
* 'queueManager' property will correspond to setQueueManager method name
*/
private String toMethodName(String propertyName) {
char c[] = propertyName.toCharArray();
c[0] = Character.toUpperCase(c[0]);
return "set" + new String(c);
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import java.util.Arrays;
import java.util.List;
public class JMSConnectionFactoryProperties {
private static final String BROKER = "broker";
private static final String CF_IMPL = "cf";
private static final String CF_LIB = "cflib";
public static final PropertyDescriptor JMS_CONNECTION_FACTORY_IMPL = new PropertyDescriptor.Builder()
.name(CF_IMPL)
.displayName("JMS Connection Factory Implementation Class")
.description("The fully qualified name of the JMS ConnectionFactory implementation "
+ "class (eg. org.apache.activemq.ActiveMQConnectionFactory).")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JMS_CLIENT_LIBRARIES = new PropertyDescriptor.Builder()
.name(CF_LIB)
.displayName("JMS Client Libraries")
.description("Path to the directory with additional resources (eg. JARs, configuration files etc.) to be added "
+ "to the classpath (defined as a comma separated list of values). Such resources typically represent target JMS client libraries "
+ "for the ConnectionFactory implementation.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor JMS_BROKER_URI = new PropertyDescriptor.Builder()
.name(BROKER)
.displayName("JMS Broker URI")
.description("URI pointing to the network location of the JMS Message broker. Example for ActiveMQ: "
+ "'tcp://myhost:61616'. Examples for IBM MQ: 'myhost(1414)' and 'myhost01(1414),myhost02(1414)'.")
.required(false)
.addValidator(new NonEmptyBrokerURIValidator())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JMS_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.displayName("JMS SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL,
JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES,
JMSConnectionFactoryProperties.JMS_BROKER_URI,
JMSConnectionFactoryProperties.JMS_SSL_CONTEXT_SERVICE
);
public static List<PropertyDescriptor> getPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
public static PropertyDescriptor getDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName
+ "' property to be set on the provided Connection Factory implementation.")
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}
/**
* {@link Validator} that ensures that brokerURI's length > 0 after EL
* evaluation
*/
private static class NonEmptyBrokerURIValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
}
}

View File

@ -16,17 +16,6 @@
*/
package org.apache.nifi.jms.cf;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import javax.jms.ConnectionFactory;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@ -34,17 +23,12 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.ConnectionFactory;
import java.util.List;
/**
* Provides a factory service that creates and initializes
@ -52,14 +36,14 @@ import org.slf4j.LoggerFactory;
* <p>
* It accomplishes it by adjusting current classpath by adding to it the
* additional resources (i.e., JMS client libraries) provided by the user via
* {@link #CLIENT_LIB_DIR_PATH}, allowing it then to create an instance of the
* {@link JMSConnectionFactoryProperties#JMS_CLIENT_LIBRARIES}, allowing it then to create an instance of the
* target {@link ConnectionFactory} based on the provided
* {@link #CONNECTION_FACTORY_IMPL} which can be than access via
* {@link JMSConnectionFactoryProperties#JMS_CONNECTION_FACTORY_IMPL} which can be than access via
* {@link #getConnectionFactory()} method.
*/
@Tags({"jms", "messaging", "integration", "queue", "topic", "publish", "subscribe"})
@CapabilityDescription("Provides a generic service to create vendor specific javax.jms.ConnectionFactory implementations. "
+ "ConnectionFactory can be served once this service is configured successfully")
+ "The Connection Factory can be served once this service is configured successfully.")
@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.",
description = "The properties that are set following Java Beans convention where a property name is derived from the 'set*' method of the vendor "
+ "specific ConnectionFactory's implementation. For example, 'com.ibm.mq.jms.MQConnectionFactory.setChannel(String)' would imply 'channel' "
@ -68,289 +52,36 @@ import org.slf4j.LoggerFactory;
@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS"})
public class JMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition {
private final Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProvider.class);
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
private volatile boolean configured;
private volatile ConnectionFactory connectionFactory;
private static final String BROKER = "broker";
private static final String CF_IMPL = "cf";
private static final String CF_LIB = "cflib";
public static final PropertyDescriptor CONNECTION_FACTORY_IMPL = new PropertyDescriptor.Builder()
.name(CF_IMPL)
.displayName("MQ ConnectionFactory Implementation")
.description("A fully qualified name of the JMS ConnectionFactory implementation "
+ "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder()
.name(CF_LIB)
.displayName("MQ Client Libraries path (i.e. /usr/jms/lib)")
.description("Path to the directory with additional resources (i.e., JARs, configuration files etc.) to be added "
+ "to the classpath. Such resources typically represent target MQ client libraries for the "
+ "ConnectionFactory implementation. Required if target is not ActiveMQ.")
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.required(false)
.dynamicallyModifiesClasspath(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
// ConnectionFactory specific properties
public static final PropertyDescriptor BROKER_URI = new PropertyDescriptor.Builder()
.name(BROKER)
.displayName("Broker URI")
.description("URI pointing to the network location of the JMS Message broker. Example for ActiveMQ: "
+ "'tcp://myhost:61616'. Examples for IBM MQ: 'myhost(1414)' and 'myhost01(1414),myhost02(1414)'")
.addValidator(new NonEmptyBrokerURIValidator())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL connections.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
static {
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(CONNECTION_FACTORY_IMPL, CLIENT_LIB_DIR_PATH, BROKER_URI, SSL_CONTEXT_SERVICE));
}
protected volatile JMSConnectionFactoryHandler delegate;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return JMSConnectionFactoryProperties.getPropertyDescriptors();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName
+ "' property to be set on the provided ConnectionFactory implementation.")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamic(true)
.build();
return JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName);
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
delegate = new JMSConnectionFactoryHandler(context, getLogger());
}
@OnDisabled
public void onDisabled() {
delegate = null;
}
@Override
public ConnectionFactory getConnectionFactory() {
return delegate.getConnectionFactory();
}
@Override
public void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
getLogger().debug("Resetting connection factory");
connectionFactory = null;
}
delegate.resetConnectionFactory(cachedFactory);
}
/**
* @return new instance of {@link ConnectionFactory}
*/
@Override
public ConnectionFactory getConnectionFactory() {
if (this.configured) {
return this.connectionFactory;
}
throw new IllegalStateException("ConnectionFactory can not be obtained unless "
+ "this ControllerService is configured. See onConfigure(ConfigurationContext) method.");
}
@OnEnabled
public void enable(ConfigurationContext context) {
try {
if (!this.configured) {
if (logger.isInfoEnabled()) {
logger.info("Configuring " + this.getClass().getSimpleName() + " for '"
+ context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue() + "' to be connected to '"
+ context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue() + "'");
}
this.createConnectionFactoryInstance(context);
this.setConnectionFactoryProperties(context);
}
this.configured = true;
} catch (Exception e) {
logger.error("Failed to configure " + this.getClass().getSimpleName(), e);
this.configured = false;
throw new IllegalStateException(e);
}
}
@OnDisabled
public void disable() {
this.connectionFactory = null;
this.configured = false;
}
/**
* This operation follows standard bean convention by matching property name
* to its corresponding 'setter' method. Once the method was located it is
* invoked to set the corresponding property to a value provided by during
* service configuration. For example, 'channel' property will correspond to
* 'setChannel(..) method and 'queueManager' property will correspond to
* setQueueManager(..) method with a single argument. The bean convention is also
* explained in user manual for this component with links pointing to
* documentation of various ConnectionFactories.
* <p>
* There are also few adjustments to accommodate well known brokers. For
* example ActiveMQ ConnectionFactory accepts address of the Message Broker
* in a form of URL while IBMs in the form of host/port pair(s).
* <p>
* This method will use the value retrieved from the 'BROKER_URI' static
* property as is. An exception to this if ConnectionFactory implementation
* is coming from IBM MQ and connecting to a stand-alone queue manager. In
* this case the Broker URI is expected to be entered as a colon separated
* host/port pair, which then is split on ':' and the resulting pair will be
* used to execute setHostName(..) and setPort(..) methods on the provided
* ConnectionFactory.
* <p>
* This method may need to be maintained and adjusted to accommodate other
* implementation of ConnectionFactory, but only for URL/Host/Port issue.
* All other properties are set as dynamic properties where user essentially
* provides both property name and value.
*
* @see <a href="http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html#setBrokerURL-java.lang.String-">setBrokerURL(String brokerURL)</a>
* @see <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html#setServerUrl(java.lang.String)">setServerUrl(String serverUrl)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String hostname)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int port)</a>
* @see <a href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String hosts)</a>
* @see #setProperty(String propertyName, Object propertyValue)
*/
void setConnectionFactoryProperties(ConfigurationContext context) {
if (context.getProperty(BROKER_URI).isSet()) {
String brokerValue = context.getProperty(BROKER_URI).evaluateAttributeExpressions().getValue();
String connectionFactoryValue = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
if (connectionFactoryValue.startsWith("org.apache.activemq")) {
this.setProperty("brokerURL", brokerValue);
} else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
this.setProperty("serverUrl", brokerValue);
} else {
String[] brokerList = brokerValue.split(",");
if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
List<String> ibmConList = new ArrayList<String>();
for (String broker : brokerList) {
String[] hostPort = broker.split(":");
if (hostPort.length == 2) {
ibmConList.add(hostPort[0]+"("+hostPort[1]+")");
} else {
ibmConList.add(broker);
}
}
this.setProperty("connectionNameList", String.join(",", ibmConList));
} else {
// Try to parse broker URI as colon separated host/port pair. Use first pair if multiple given.
String[] hostPort = brokerList[0].split(":");
if (hostPort.length == 2) {
// If broker URI indeed was colon separated host/port pair
this.setProperty("hostName", hostPort[0]);
this.setProperty("port", hostPort[1]);
}
}
}
}
SSLContextService sc = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sc != null) {
SSLContext ssl = sc.createSSLContext(ClientAuth.NONE);
this.setProperty("sSLSocketFactory", ssl.getSocketFactory());
}
List<Entry<PropertyDescriptor, String>> dynamicProperties = context.getProperties().entrySet().stream()
.filter(entry -> entry.getKey().isDynamic())
.collect(Collectors.toList());
for (Entry<PropertyDescriptor, String> entry : dynamicProperties) {
PropertyDescriptor descriptor = entry.getKey();
String propertyName = descriptor.getName();
String propertyValue = context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
this.setProperty(propertyName, propertyValue);
}
}
/**
* Sets corresponding {@link ConnectionFactory}'s property to a
* 'propertyValue' by invoking a 'setter' method that corresponds to
* 'propertyName'. For example, 'channel' property will correspond to
* 'setChannel(..) method and 'queueManager' property will correspond to
* setQueueManager(..) method with a single argument.
* <p>
* NOTE: There is a limited type conversion to accommodate property value
* types since all NiFi configuration properties comes as String. It is
* accomplished by checking the argument type of the method and executing
* its corresponding conversion to target primitive (e.g., value 'true' will
* go thru Boolean.parseBoolean(propertyValue) if method argument is of type
* boolean). None-primitive values are not supported at the moment and will
* result in {@link IllegalArgumentException}. It is OK though since based
* on analysis of several ConnectionFactory implementation the all seem to
* follow bean convention and all their properties using Java primitives as
* arguments.
*/
void setProperty(String propertyName, Object propertyValue) {
String methodName = this.toMethodName(propertyName);
Method[] methods = Utils.findMethods(methodName, this.connectionFactory.getClass());
if (methods != null && methods.length > 0) {
try {
for (Method method : methods) {
Class<?> returnType = method.getParameterTypes()[0];
if (String.class.isAssignableFrom(returnType)) {
method.invoke(this.connectionFactory, propertyValue);
return;
} else if (int.class.isAssignableFrom(returnType)) {
method.invoke(this.connectionFactory, Integer.parseInt((String) propertyValue));
return;
} else if (long.class.isAssignableFrom(returnType)) {
method.invoke(this.connectionFactory, Long.parseLong((String) propertyValue));
return;
} else if (boolean.class.isAssignableFrom(returnType)) {
method.invoke(this.connectionFactory, Boolean.parseBoolean((String) propertyValue));
return;
}
}
methods[0].invoke(this.connectionFactory, propertyValue);
} catch (Exception e) {
throw new IllegalStateException("Failed to set property " + propertyName, e);
}
} else if (propertyName.equals("hostName")) {
this.setProperty("host", propertyValue); // try 'host' as another common convention.
}
}
/**
* Creates an instance of the {@link ConnectionFactory} from the provided
* 'CONNECTION_FACTORY_IMPL'.
*/
private void createConnectionFactoryInstance(ConfigurationContext context) {
String connectionFactoryImplName = context.getProperty(CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
this.connectionFactory = Utils.newDefaultInstance(connectionFactoryImplName);
}
/**
* Will convert propertyName to a method name following bean convention. For
* example, 'channel' property will correspond to 'setChannel method and
* 'queueManager' property will correspond to setQueueManager method name
*/
private String toMethodName(String propertyName) {
char c[] = propertyName.toCharArray();
c[0] = Character.toUpperCase(c[0]);
return "set" + new String(c);
}
/**
* {@link Validator} that ensures that brokerURI's length > 0 after EL
* evaluation
*/
static class NonEmptyBrokerURIValidator implements Validator {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
}
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Hashtable;
import java.util.Set;
import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME;
import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_CREDENTIALS;
import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY;
import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_PRINCIPAL;
import static org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL;
/**
* Handler class to retrieve a JMS Connection Factory object via JNDI.
* The handler can be used from controller services and processors as well.
*/
public class JndiJmsConnectionFactoryHandler implements IJMSConnectionFactoryProvider {
private final PropertyContext context;
private final Set<PropertyDescriptor> propertyDescriptors;
private final ComponentLog logger;
private volatile ConnectionFactory connectionFactory;
public JndiJmsConnectionFactoryHandler(ConfigurationContext context, ComponentLog logger) {
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
public JndiJmsConnectionFactoryHandler(ProcessContext context, ComponentLog logger) {
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
@Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = lookupConnectionFactory();
} else {
logger.debug("Connection Factory has already been obtained from JNDI. Will return cached instance.");
}
return connectionFactory;
}
@Override
public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
logger.debug("Resetting connection factory");
connectionFactory = null;
}
}
private ConnectionFactory lookupConnectionFactory() {
try {
final String factoryName = context.getProperty(JNDI_CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
logger.debug("Looking up Connection Factory with name [{}]", new Object[] {factoryName});
final Context initialContext = createInitialContext();
final Object factoryObject = initialContext.lookup(factoryName);
logger.debug("Obtained {} from JNDI", new Object[] {factoryObject});
if (factoryObject == null) {
throw new ProcessException("Got a null Factory Object from JNDI");
}
if (!(factoryObject instanceof ConnectionFactory)) {
throw new ProcessException("Successfully performed JNDI lookup with Object Name [" + factoryName + "] but the returned object is not a ConnectionFactory. " +
"Instead, is of type " + factoryObject.getClass() + " : " + factoryObject);
}
return (ConnectionFactory) instrumentWithClassLoader(factoryObject, Thread.currentThread().getContextClassLoader(), ConnectionFactory.class);
} catch (final NamingException ne) {
throw new ProcessException("Could not obtain JMS Connection Factory from JNDI", ne);
}
}
private Context createInitialContext() throws NamingException {
final Hashtable<String, String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, context.getProperty(JNDI_INITIAL_CONTEXT_FACTORY).evaluateAttributeExpressions().getValue().trim());
env.put(Context.PROVIDER_URL, context.getProperty(JNDI_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim());
final String principal = context.getProperty(JNDI_PRINCIPAL).evaluateAttributeExpressions().getValue();
if (principal != null) {
env.put(Context.SECURITY_PRINCIPAL, principal);
}
final String credentials = context.getProperty(JNDI_CREDENTIALS).getValue();
if (credentials != null) {
env.put(Context.SECURITY_CREDENTIALS, credentials);
}
propertyDescriptors.forEach(descriptor -> {
if (descriptor.isDynamic()) {
env.put(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
}
});
logger.debug("Creating Initial Context using JNDI Environment {}", new Object[] {env});
final Context initialContext = new InitialContext(env);
return initialContext;
}
private static Object instrumentWithClassLoader(final Object obj, final ClassLoader classLoader, final Class<?>... interfaces) {
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final Thread thread = Thread.currentThread();
final ClassLoader currentClassLoader = thread.getContextClassLoader();
try {
thread.setContextClassLoader(classLoader);
return method.invoke(obj, args);
} finally {
thread.setContextClassLoader(currentClassLoader);
}
}
};
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler);
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.Arrays;
import java.util.List;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
public class JndiJmsConnectionFactoryProperties {
public static final PropertyDescriptor JNDI_INITIAL_CONTEXT_FACTORY = new Builder()
.name("java.naming.factory.initial")
.displayName("JNDI Initial Context Factory Class")
.description("The fully qualified class name of the JNDI Initial Context Factory Class (java.naming.factory.initial).")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JNDI_PROVIDER_URL = new Builder()
.name("java.naming.provider.url")
.displayName("JNDI Provider URL")
.description("The URL of the JNDI Provider to use (java.naming.provider.url).")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JNDI_CONNECTION_FACTORY_NAME = new Builder()
.name("connection.factory.name")
.displayName("JNDI Name of the Connection Factory")
.description("The name of the JNDI Object to lookup for the Connection Factory.")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JNDI_CLIENT_LIBRARIES = new Builder()
.name("naming.factory.libraries")
.displayName("JNDI / JMS Client Libraries")
.description("Specifies jar files and/or directories (defined as a comma separated list) to add to the ClassPath " +
"in order to load the JNDI / JMS client libraries.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor JNDI_PRINCIPAL = new Builder()
.name("java.naming.security.principal")
.displayName("JNDI Principal")
.description("The Principal to use when authenticating with JNDI (java.naming.security.principal).")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor JNDI_CREDENTIALS = new Builder()
.name("java.naming.security.credentials")
.displayName("JNDI Credentials")
.description("The Credentials to use when authenticating with JNDI (java.naming.security.credentials).")
.required(false)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY,
JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL,
JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME,
JndiJmsConnectionFactoryProperties.JNDI_CLIENT_LIBRARIES,
JndiJmsConnectionFactoryProperties.JNDI_PRINCIPAL,
JndiJmsConnectionFactoryProperties.JNDI_CREDENTIALS
);
public static List<PropertyDescriptor> getPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
public static PropertyDescriptor getDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.description("JNDI Initial Context Environment configuration for '" + propertyDescriptorName + "'")
.required(false)
.dynamic(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
}
}

View File

@ -21,214 +21,55 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.List;
import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
@Tags({"jms", "jndi", "messaging", "integration", "queue", "topic", "publish", "subscribe"})
@CapabilityDescription("Provides a service to lookup an existing JMS ConnectionFactory using the Java Naming and Directory Interface (JNDI).")
@DynamicProperty(
description = "In order to perform a JNDI Lookup, an Initial Context must be established. When this is done, an Environment can be established for the context. Any dynamic/user-defined property" +
" that is added to this Controller Service will be added as an Environment configuration/variable to this Context.",
name = "The name of a JNDI Initial Context environment variable.",
value = "The value of the JNDI Initial Context Environment variable.",
value = "The value of the JNDI Initial Context environment variable.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS", "org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"})
public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition {
static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new Builder()
.name("java.naming.factory.initial")
.displayName("Initial Naming Factory Class")
.description("The fully qualified class name of the Java Initial Naming Factory (java.naming.factory.initial).")
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
.build();
static final PropertyDescriptor NAMING_PROVIDER_URL = new Builder()
.name("java.naming.provider.url")
.displayName("Naming Provider URL")
.description("The URL of the JNDI Naming Provider to use")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor CONNECTION_FACTORY_NAME = new Builder()
.name("connection.factory.name")
.displayName("Connection Factory Name")
.description("The name of the JNDI Object to lookup for the Connection Factory")
.required(true)
.addValidator(NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor NAMING_FACTORY_LIBRARIES = new Builder()
.name("naming.factory.libraries")
.displayName("Naming Factory Libraries")
.description("Specifies .jar files or directories to add to the ClassPath in order to find the Initial Naming Factory Class")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.createListValidator(true, true, StandardValidators.createURLorFileValidator()))
.dynamicallyModifiesClasspath(true)
.build();
static final PropertyDescriptor PRINCIPAL = new Builder()
.name("java.naming.security.principal")
.displayName("JNDI Principal")
.description("The Principal to use when authenticating with JNDI")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor CREDENTIALS = new Builder()
.name("java.naming.security.credentials")
.displayName("Credentials")
.description("The Credentials to use when authenticating with JNDI")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.addValidator(Validator.VALID)
.sensitive(true)
.build();
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(
INITIAL_NAMING_FACTORY_CLASS,
NAMING_PROVIDER_URL,
CONNECTION_FACTORY_NAME,
NAMING_FACTORY_LIBRARIES,
PRINCIPAL,
CREDENTIALS);
private ConnectionFactory connectionFactory;
private volatile JndiJmsConnectionFactoryHandler delegate;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
return JndiJmsConnectionFactoryProperties.getPropertyDescriptors();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.description("JNDI Initial Context Environment configuration for '" + propertyDescriptorName + "'")
.required(false)
.dynamic(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
return JndiJmsConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName);
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
delegate = new JndiJmsConnectionFactoryHandler(context, getLogger());
}
@OnDisabled
public void shutdown() {
connectionFactory = null;
public void onDisabled() {
delegate = null;
}
@Override
public synchronized void resetConnectionFactory(ConnectionFactory cachedFactory) {
if (cachedFactory == connectionFactory) {
getLogger().debug("Resetting connection factory");
connectionFactory = null;
}
public ConnectionFactory getConnectionFactory() {
return delegate.getConnectionFactory();
}
@Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = lookupConnectionFactory();
} else {
getLogger().debug("Connection Factory has already been obtained from JNDI. Will return cached instance.");
}
return connectionFactory;
}
private ConnectionFactory lookupConnectionFactory() {
try {
final ConfigurationContext context = getConfigurationContext();
final String factoryName = context.getProperty(CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
getLogger().debug("Looking up Connection Factory with name [{}]", new Object[] {factoryName});
final Context initialContext = createInitialContext();
final Object factoryObject = initialContext.lookup(factoryName);
getLogger().debug("Obtained {} from JNDI", new Object[] {factoryObject});
if (factoryObject == null) {
throw new ProcessException("Got a null Factory Object from JNDI");
}
if (!(factoryObject instanceof ConnectionFactory)) {
throw new ProcessException("Successfully performed JNDI lookup with Object Name [" + factoryName + "] but the returned object is not a ConnectionFactory. " +
"Instead, is of type " + factoryObject.getClass() + " : " + factoryObject);
}
return (ConnectionFactory) instrumentWithClassLoader(factoryObject, Thread.currentThread().getContextClassLoader(), ConnectionFactory.class);
} catch (final NamingException ne) {
throw new ProcessException("Could not obtain JMS Connection Factory from JNDI", ne);
}
}
private Context createInitialContext() throws NamingException {
final ConfigurationContext context = getConfigurationContext();
final Hashtable<String, String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim());
env.put(Context.PROVIDER_URL, context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim());
final String principal = context.getProperty(PRINCIPAL).evaluateAttributeExpressions().getValue();
if (principal != null) {
env.put(Context.SECURITY_PRINCIPAL, principal);
}
final String credentials = context.getProperty(CREDENTIALS).getValue();
if (credentials != null) {
env.put(Context.SECURITY_CREDENTIALS, credentials);
}
context.getProperties().keySet().forEach(descriptor -> {
if (descriptor.isDynamic()) {
env.put(descriptor.getName(), context.getProperty(descriptor).evaluateAttributeExpressions().getValue());
}
});
getLogger().debug("Creating Initial Context using JNDI Environment {}", new Object[] {env});
final Context initialContext = new InitialContext(env);
return initialContext;
}
public static Object instrumentWithClassLoader(final Object obj, final ClassLoader classLoader, final Class<?>... interfaces) {
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final Thread thread = Thread.currentThread();
final ClassLoader currentClassLoader = thread.getContextClassLoader();
try {
thread.setContextClassLoader(classLoader);
return method.invoke(obj, args);
} finally {
thread.setContextClassLoader(currentClassLoader);
}
}
};
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler);
public void resetConnectionFactory(ConnectionFactory cachedFactory) {
delegate.resetConnectionFactory(cachedFactory);
}
}

View File

@ -18,10 +18,19 @@ package org.apache.nifi.jms.processors;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.jms.cf.IJMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryHandler;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryHandler;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -37,10 +46,13 @@ import javax.jms.ConnectionFactory;
import javax.jms.Message;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Base JMS processor to support implementation of JMS producers and consumers.
@ -102,15 +114,7 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
.defaultValue("1")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
static final PropertyDescriptor MESSAGE_BODY = new PropertyDescriptor.Builder()
.name("message-body-type")
.displayName("Message Body Type")
.description("The type of JMS message body to construct.")
.required(true)
.defaultValue(BYTES_MESSAGE)
.allowableValues(BYTES_MESSAGE, TEXT_MESSAGE)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("character-set")
.displayName("Character Set")
.description("The name of the character set to use to construct or interpret TextMessages")
@ -119,61 +123,57 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
.defaultValue(Charset.defaultCharset().name())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new PropertyDescriptor.Builder()
.name("allow-illegal-chars-in-jms-header-names")
.displayName("Allow Illegal Characters in Header Names")
.description("Specifies whether illegal characters in header names should be sent to the JMS broker. " +
"Usually hyphens and full-stops.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-send-as-jms-headers-regex")
.displayName("Attributes to Send as JMS Headers (Regex)")
.description("Specifies the Regular Expression that determines the names of FlowFile attributes that" +
" should be sent as JMS Headers")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.defaultValue(".*")
.required(true)
.build();
static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder()
.name("Connection Factory Service")
.description("The Controller Service that is used to obtain ConnectionFactory")
.required(true)
.description("The Controller Service that is used to obtain Connection Factory. Alternatively, the 'JNDI *' or the 'JMS *' properties " +
"can also be be used to configure the Connection Factory.")
.required(false)
.identifiesControllerService(JMSConnectionFactoryProviderDefinition.class)
.build();
static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
static final List<PropertyDescriptor> JNDI_JMS_CF_PROPERTIES = Collections.unmodifiableList(
JndiJmsConnectionFactoryProperties.getPropertyDescriptors().stream()
.map(pd -> new PropertyDescriptor.Builder()
.fromPropertyDescriptor(pd)
.required(false)
.build())
.collect(Collectors.toList())
);
static final List<PropertyDescriptor> JMS_CF_PROPERTIES = Collections.unmodifiableList(
JMSConnectionFactoryProperties.getPropertyDescriptors().stream()
.map(pd -> new PropertyDescriptor.Builder()
.fromPropertyDescriptor(pd)
.required(false)
.build())
.collect(Collectors.toList())
);
private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
private volatile BlockingQueue<T> workerPool;
private final AtomicInteger clientIdCounter = new AtomicInteger(1);
static {
propertyDescriptors.add(CF_SERVICE);
propertyDescriptors.add(DESTINATION);
propertyDescriptors.add(DESTINATION_TYPE);
propertyDescriptors.add(USER);
propertyDescriptors.add(PASSWORD);
propertyDescriptors.add(CLIENT_ID);
propertyDescriptors.add(SESSION_CACHE_SIZE);
propertyDescriptors.add(MESSAGE_BODY);
propertyDescriptors.add(CHARSET);
propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
}
protected static String getClientId(ProcessContext context) {
return context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Additional configuration property for the Connection Factory")
.name(propertyDescriptorName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return new ConnectionFactoryConfigValidator(validationContext).validateConnectionFactoryConfig();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
@ -194,17 +194,16 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
//and reconnection is required.
if (worker == null || !worker.isValid()){
getLogger().debug("Worker is invalid. Will try re-create... ");
final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
try {
if (worker != null) {
worker.shutdown();
}
// Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
connectionFactoryProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
worker = buildTargetResource(context);
}catch(Exception e) {
getLogger().error("Failed to rebuild: " + cfProvider);
getLogger().error("Failed to rebuild: " + connectionFactoryProvider);
worker = null;
}
}
@ -218,6 +217,24 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
}
}
@OnScheduled
public void setupConnectionFactoryProvider(final ProcessContext context) {
if (context.getProperty(CF_SERVICE).isSet()) {
connectionFactoryProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
} else if (context.getProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME).isSet()) {
connectionFactoryProvider = new JndiJmsConnectionFactoryHandler(context, getLogger());
} else if (context.getProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL).isSet()) {
connectionFactoryProvider = new JMSConnectionFactoryHandler(context, getLogger());
} else {
throw new ProcessException("No Connection Factory configured.");
}
}
@OnUnscheduled
public void shutdownConnectionFactoryProvider(final ProcessContext context) {
connectionFactoryProvider = null;
}
@OnScheduled
public void setupWorkerPool(final ProcessContext context) {
workerPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
@ -257,8 +274,7 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
* {@link JmsTemplate} used by this Processor.
*/
private T buildTargetResource(ProcessContext context) {
final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
final ConnectionFactory connectionFactory = cfProvider.getConnectionFactory();
final ConnectionFactory connectionFactory = connectionFactoryProvider.getConnectionFactory();
final UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter();
cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
@ -291,4 +307,93 @@ public abstract class AbstractJMSProcessor<T extends JMSWorker> extends Abstract
}
}
static class ConnectionFactoryConfigValidator {
private final ValidationContext validationContext;
private final PropertyValue connectionFactoryServiceProperty;
private final PropertyValue jndiInitialContextFactoryProperty;
private final PropertyValue jmsConnectionFactoryImplProperty;
ConnectionFactoryConfigValidator(ValidationContext validationContext) {
this.validationContext = validationContext;
connectionFactoryServiceProperty = validationContext.getProperty(CF_SERVICE);
jndiInitialContextFactoryProperty = validationContext.getProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY);
jmsConnectionFactoryImplProperty = validationContext.getProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL);
}
List<ValidationResult> validateConnectionFactoryConfig() {
List<ValidationResult> results = new ArrayList<>();
if (!(connectionFactoryServiceProperty.isSet() || jndiInitialContextFactoryProperty.isSet() || jmsConnectionFactoryImplProperty.isSet())) {
results.add(new ValidationResult.Builder()
.subject("Connection Factory config")
.valid(false)
.explanation(String.format("either '%s', '%s' or '%s' must be specified.", CF_SERVICE.getDisplayName(),
JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY.getDisplayName(), JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL.getDisplayName()))
.build());
} else if (connectionFactoryServiceProperty.isSet()) {
if (hasLocalJndiJmsConnectionFactoryConfig()) {
results.add(new ValidationResult.Builder()
.subject("Connection Factory config")
.valid(false)
.explanation(String.format("cannot set both '%s' and 'JNDI *' properties.", CF_SERVICE.getDisplayName()))
.build());
}
if (hasLocalJMSConnectionFactoryConfig()) {
results.add(new ValidationResult.Builder()
.subject("Connection Factory config")
.valid(false)
.explanation(String.format("cannot set both '%s' and 'JMS *' properties.", CF_SERVICE.getDisplayName()))
.build());
}
} else if (hasLocalJndiJmsConnectionFactoryConfig() && hasLocalJMSConnectionFactoryConfig()) {
results.add(new ValidationResult.Builder()
.subject("Connection Factory config")
.valid(false)
.explanation("cannot set both 'JNDI *' and 'JMS *' properties.")
.build());
} else if (jndiInitialContextFactoryProperty.isSet()) {
validateLocalConnectionFactoryConfig(JndiJmsConnectionFactoryProperties.getPropertyDescriptors(), JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, results);
} else if (jmsConnectionFactoryImplProperty.isSet()) {
validateLocalConnectionFactoryConfig(JMSConnectionFactoryProperties.getPropertyDescriptors(), JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, results);
}
return results;
}
private boolean hasLocalJndiJmsConnectionFactoryConfig() {
return hasLocalConnectionFactoryConfig(JndiJmsConnectionFactoryProperties.getPropertyDescriptors());
}
private boolean hasLocalJMSConnectionFactoryConfig() {
return hasLocalConnectionFactoryConfig(JMSConnectionFactoryProperties.getPropertyDescriptors());
}
private boolean hasLocalConnectionFactoryConfig(List<PropertyDescriptor> localConnectionFactoryProperties) {
for (PropertyDescriptor propertyDescriptor : localConnectionFactoryProperties) {
PropertyValue propertyValue = validationContext.getProperty(propertyDescriptor);
if (propertyValue.isSet()) {
return true;
}
}
return false;
}
private void validateLocalConnectionFactoryConfig(List<PropertyDescriptor> localConnectionFactoryProperties, PropertyDescriptor indicatorProperty, List<ValidationResult> results) {
for (PropertyDescriptor propertyDescriptor : localConnectionFactoryProperties) {
if (propertyDescriptor.isRequired()) {
PropertyValue propertyValue = validationContext.getProperty(propertyDescriptor);
if (!propertyValue.isSet()) {
results.add(new ValidationResult.Builder()
.subject("Connection Factory config")
.valid(false)
.explanation(String.format("'%s' must be specified when '%s' has been configured.", propertyDescriptor.getDisplayName(), indicatorProperty.getDisplayName()))
.build());
}
}
}
}
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.jms.processors;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -81,6 +82,10 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = ConsumeJMS.JMS_MESSAGETYPE, description = "The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."),
@WritesAttribute(attribute = "other attributes", description = "Each message property is written to an attribute.")
})
@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.",
description = "Additional configuration property for the Connection Factory. It can be used when the Connection Factory is being configured via the 'JNDI *' or the 'JMS *'" +
"properties of the processor. For more information, see the Additional Details page.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
public static final String JMS_MESSAGETYPE = "jms.messagetype";
@ -161,21 +166,25 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> thisPropertyDescriptors;
private final static List<PropertyDescriptor> propertyDescriptors;
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.addAll(propertyDescriptors);
_propertyDescriptors.remove(MESSAGE_BODY);
_propertyDescriptors.remove(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.remove(ATTRIBUTES_AS_HEADERS_REGEX);
_propertyDescriptors.add(CF_SERVICE);
_propertyDescriptors.add(DESTINATION);
_propertyDescriptors.add(DESTINATION_TYPE);
_propertyDescriptors.add(USER);
_propertyDescriptors.add(PASSWORD);
_propertyDescriptors.add(CLIENT_ID);
_propertyDescriptors.add(SESSION_CACHE_SIZE);
// change the validator on CHARSET property
_propertyDescriptors.remove(CHARSET);
PropertyDescriptor CHARSET_WITH_EL_VALIDATOR_PROPERTY = new PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET)
PropertyDescriptor charsetWithELValidatorProperty = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CHARSET)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR_WITH_EVALUATION)
.build();
_propertyDescriptors.add(CHARSET_WITH_EL_VALIDATOR_PROPERTY);
_propertyDescriptors.add(charsetWithELValidatorProperty);
_propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
_propertyDescriptors.add(DURABLE_SUBSCRIBER);
@ -183,7 +192,11 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(SUBSCRIPTION_NAME);
_propertyDescriptors.add(TIMEOUT);
_propertyDescriptors.add(ERROR_QUEUE);
thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
@ -292,7 +305,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return thisPropertyDescriptors;
return propertyDescriptors;
}
/**

View File

@ -17,6 +17,7 @@
package org.apache.nifi.jms.processors;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -26,6 +27,8 @@ import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.processor.ProcessContext;
@ -33,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@ -42,9 +46,11 @@ import javax.jms.Destination;
import javax.jms.Message;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@ -80,10 +86,42 @@ import java.util.regex.Pattern;
+ " `delay.type` with value `integer` will cause a JMS message property `delay` to be sent as an Integer rather than a String. Supported types are boolean, byte,"
+ " short, integer, long, float, double, and string (which is the default).")
})
@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.",
description = "Additional configuration property for the Connection Factory. It can be used when the Connection Factory is being configured via the 'JNDI *' or the 'JMS *'" +
"properties of the processor. For more information, see the Additional Details page.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(value = { ConsumeJMS.class, JMSConnectionFactoryProvider.class })
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
static final PropertyDescriptor MESSAGE_BODY = new PropertyDescriptor.Builder()
.name("message-body-type")
.displayName("Message Body Type")
.description("The type of JMS message body to construct.")
.required(true)
.defaultValue(BYTES_MESSAGE)
.allowableValues(BYTES_MESSAGE, TEXT_MESSAGE)
.build();
static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new PropertyDescriptor.Builder()
.name("allow-illegal-chars-in-jms-header-names")
.displayName("Allow Illegal Characters in Header Names")
.description("Specifies whether illegal characters in header names should be sent to the JMS broker. " +
"Usually hyphens and full-stops.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder()
.name("attributes-to-send-as-jms-headers-regex")
.displayName("Attributes to Send as JMS Headers (Regex)")
.description("Specifies the Regular Expression that determines the names of FlowFile attributes that" +
" should be sent as JMS Headers")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.defaultValue(".*")
.required(true)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are sent to the JMS destination are routed to this relationship")
@ -93,6 +131,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
.description("All FlowFiles that cannot be sent to JMS destination are routed to this relationship")
.build();
private static final List<PropertyDescriptor> propertyDescriptors;
private final static Set<Relationship> relationships;
/*
@ -100,6 +139,27 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
* Will also create a Set of relationships
*/
static {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(CF_SERVICE);
_propertyDescriptors.add(DESTINATION);
_propertyDescriptors.add(DESTINATION_TYPE);
_propertyDescriptors.add(USER);
_propertyDescriptors.add(PASSWORD);
_propertyDescriptors.add(CLIENT_ID);
_propertyDescriptors.add(SESSION_CACHE_SIZE);
_propertyDescriptors.add(MESSAGE_BODY);
_propertyDescriptors.add(CHARSET);
_propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS);
_propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX);
_propertyDescriptors.addAll(JNDI_JMS_CF_PROPERTIES);
_propertyDescriptors.addAll(JMS_CF_PROPERTIES);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
_relationships.add(REL_SUCCESS);
_relationships.add(REL_FAILURE);
@ -169,6 +229,11 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
*
*/

View File

@ -47,7 +47,7 @@
The mandatory configuration property is:
</p>
<ul>
<li><b>MQ ConnectionFactory Implementation</b> - A fully qualified name of the JMS <i>ConnectionFactory</i>
<li><b>JMS Connection Factory Implementation</b> - The fully qualified name of the JMS <i>ConnectionFactory</i>
implementation class. For example:
<ul>
<li>Apache ActiveMQ - <a href="http://activemq.apache.org/maven/5.15.9/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html" target="_blank">org.apache.activemq.ActiveMQConnectionFactory</a></li>
@ -60,12 +60,11 @@
The following static configuration properties are optional but required in many cases:
</p>
<ul>
<li><b>MQ Client Libraries path</b> - Path to the directory with additional resources (i.e. JARs,
configuration files, etc.) to be added to the classpath. Such resources typically represent target client
libraries for the <i>ConnectionFactory</i> implementation. It is optional if you are using Apache ActiveMQ since
its libraries are included with this component.
<li><b>JMS Client Libraries</b> - Path to the directory with additional resources (eg. JARs,
configuration files, etc.) to be added to the classpath (defined as a comma separated list of values). Such resources typically represent target JMS client
libraries for the <i>ConnectionFactory</i> implementation.
</li>
<li><b>Broker URI</b> - URI pointing to the network location of the JMS Message broker. For example:
<li><b>JMS Broker URI</b> - URI pointing to the network location of the JMS Message broker. For example:
<ul>
<li>Apache ActiveMQ - <i>tcp://myhost:1234</i> for single broker and
<i>failover:(tcp://myhost01:1234,tcp://myhost02:1234)</i> for multiple brokers.
@ -108,26 +107,26 @@
<th>Comments</th>
</tr>
<tr>
<td>MQ ConnectionFactory Implementation</td>
<td>JMS Connection Factory Implementation</td>
<td>com.ibm.mq.jms.MQQueueConnectionFactory</td>
<td>Static</td>
<td>Vendor provided implementation of QueueConnectionFactory</td>
</tr>
<tr>
<td>MQ Client Libraries path (i.e. /usr/jms/lib)</td>
<td>JMS Client Libraries</td>
<td>/opt/mqm/java/lib</td>
<td>Static</td>
<td>Default installation path of client JAR files on Linux systems</td>
</tr>
<tr>
<td>Broker URI</td>
<td>JMS Broker URI</td>
<td>mqhost01(1414),mqhost02(1414)</td>
<td>Static</td>
<td><a href="https://www.ibm.com/support/knowledgecenter/ro/SSAW57_9.0.0/com.ibm.websphere.nd.multiplatform.doc/ae/ucli_pqcfm.html#MQTopicConnectionFactory_enterporthostname" target="_blank">Connection Name List syntax</a>.
Colon separated host/port pair(s) is also supported</td>
</tr>
<tr>
<td>SSL Context Service</td>
<td>JMS SSL Context Service</td>
<td></td>
<td>Static</td>
<td>Only required if using SSL/TLS</td>

View File

@ -34,18 +34,19 @@
<pre>
<code>
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_NAMING_FACTORY_CLASS); // Value for this comes from the "Initial Naming Factory Class" property.
env.put(Context.PROVIDER_URL, NAMING_PROVIDER_URL); // Value for this comes from the "Naming Provider URL" property.
env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_INITIAL_CONTEXT_FACTORY); // Value for this comes from the "JNDI Initial Context Factory Class" property.
env.put(Context.PROVIDER_URL, JNDI_PROVIDER_URL); // Value for this comes from the "JNDI Provider URL" property.
env.put("My-Environment-Variable", "Environment-Variable-Value"); // This is accomplished by added a user-defined property with name "My-Environment-Variable" and value "Environment-Variable-Value"
Context initialContext = new InitialContext(env);
ConnectionFactory connectionFactory = initialContext.lookup(CONNECTION_FACTORY_NAME); // Value for Connection Factory name comes from "Connection Factory Name" property
ConnectionFactory connectionFactory = initialContext.lookup(JNDI_CONNECTION_FACTORY_NAME); // Value for this comes from the "JNDI Name of the Connection Factory" property
</code>
</pre>
<p>
It is also important to note that, in order for this to work, the class named by the Initial Naming Factory Class must be available on the classpath.
In NiFi, this is accomplished by setting the "Naming Factory Libraries" property to point to one or more .jar files or directories (comma-separated values).
It is also important to note that, in order for this to work, the class named by the "JNDI Initial Context Factory Class" must be available on the classpath.
The JMS provider specific client classes (like the class of the Connection Factory object to be retrieved from JNDI) must also be available on the classpath.
In NiFi, this is accomplished by setting the "JNDI / JMS Client Libraries" property to point to one or more .jar files or directories (comma-separated values).
</p>
<p>
@ -67,27 +68,27 @@ ConnectionFactory connectionFactory = initialContext.lookup(CONNECTION_FACTORY_N
</thead>
<tbody>
<tr>
<td>Initial Naming Factory Class</td>
<td>JNDI Initial Context Factory Class</td>
<td>org.apache.activemq.jndi.ActiveMQInitialContextFactory</td>
</tr>
<tr>
<td>Naming Provider URL</td>
<td>JNDI Provider URL</td>
<td>tcp://jms-broker:61616</td>
</tr>
<tr>
<td>Connection Factory Name</td>
<td>JNDI Name of the Connection Factory</td>
<td>ConnectionFactory</td>
</tr>
<tr>
<td>Naming Factory Libraries</td>
<td>JNDI / JMS Client Libraries</td>
<td>/opt/apache-activemq-5.15.2/lib/</td>
</tr>
</tbody>
</table>
<p>
The above example assumes that there exists as host that is accessible with hostname "jms-broker" and that is running Apache ActiveMQ on port 61616 and also that
the jar containing the org.apache.activemq.jndi.ActiveMQInitialContextFactory class can be found within the /opt/apache-activemq-5.15.2/lib/ directory.
The above example assumes that there exists a host that is accessible with hostname "jms-broker" and that is running Apache ActiveMQ on port 61616 and also that
the jar(s) containing the org.apache.activemq.jndi.ActiveMQInitialContextFactory class and the other JMS client classes can be found within the /opt/apache-activemq-5.15.2/lib/ directory.
</p>
</body>

View File

@ -49,9 +49,30 @@
<li><b>Destination Type</b> - [REQUIRED] the type of the <i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
Usually provided by the administrator. Defaults to 'QUEUE'.
</li>
<li><b>Connection Factory Service</b> - [REQUIRED] link to a pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
</li>
</ol>
<h3>Connection Factory Configuration</h3>
There are multiple ways to configure the Connection Factory for the processor:
<ul>
<li><b>Connection Factory Service</b> property - link to a pre-configured controller service (<i>JndiJmsConnectionFactoryProvider</i> or <i>JMSConnectionFactoryProvider</i>)
</li>
<li><b>JNDI *</b> properties - processor level configuration, the properties are the same as the properties of <i>JndiJmsConnectionFactoryProvider</i> controller service,
the dynamic properties can also be used in this case
</li>
<li><b>JMS *</b> properties - processor level configuration, the properties are the same as the properties of <i>JMSConnectionFactoryProvider</i> controller service,
the dynamic properties can also be used in this case
</li>
</ul>
<p>
The preferred way is to use the Connection Factory Service property and a pre-configured controller service. It is also the most convenient method, because it is enough
to configure the controller service once and then it can be used in multiple processors.
</p>
<p>
However, some JMS client libraries may not work with the controller services due to incompatible Java ClassLoader handling between the 3rd party JMS client library and NiFi.
Should you encounter <i>java.lang.ClassCastException</i> errors when using the controller services, please try to configure the Connection Factory via the 'JNDI *' or
the 'JMS *' and the dynamic properties of the processor.
For more details on these properties, see the documentation of the corresponding controller service (<i>JndiJmsConnectionFactoryProvider</i> for 'JNDI *' and
<i>JMSConnectionFactoryProvider</i> for 'JMS *').
</p>
</body>
</html>

View File

@ -52,9 +52,30 @@
<li><b>Destination Type</b> - [REQUIRED] the type of the <i>javax.jms.Destination</i>. Could be one of 'QUEUE' or 'TOPIC'
Usually provided by the administrator. Defaults to 'QUEUE'.
</li>
<li><b>Connection Factory Service</b> - [REQUIRED] link to a pre-configured instance of org.apache.nifi.jms.cf.JMSConnectionFactoryProvider.
</li>
</ol>
<h3>Connection Factory Configuration</h3>
There are multiple ways to configure the Connection Factory for the processor:
<ul>
<li><b>Connection Factory Service</b> property - link to a pre-configured controller service (<i>JndiJmsConnectionFactoryProvider</i> or <i>JMSConnectionFactoryProvider</i>)
</li>
<li><b>JNDI *</b> properties - processor level configuration, the properties are the same as the properties of <i>JndiJmsConnectionFactoryProvider</i> controller service,
the dynamic properties can also be used in this case
</li>
<li><b>JMS *</b> properties - processor level configuration, the properties are the same as the properties of <i>JMSConnectionFactoryProvider</i> controller service,
the dynamic properties can also be used in this case
</li>
</ul>
<p>
The preferred way is to use the Connection Factory Service property and a pre-configured controller service. It is also the most convenient method, because it is enough
to configure the controller service once and then it can be used in multiple processors.
</p>
<p>
However, some JMS client libraries may not work with the controller services due to incompatible Java ClassLoader handling between the 3rd party JMS client library and NiFi.
Should you encounter <i>java.lang.ClassCastException</i> errors when using the controller services, please try to configure the Connection Factory via the 'JNDI *' or
the 'JMS *' and the dynamic properties of the processor.
For more details on these properties, see the documentation of the corresponding controller service (<i>JndiJmsConnectionFactoryProvider</i> for 'JNDI *' and
<i>JMSConnectionFactoryProvider</i> for 'JMS *').
</p>
</body>
</html>

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import java.util.HashMap;
import java.util.Map;
/**
* Sub-class of {@link JMSConnectionFactoryHandler} only for testing purpose
*/
public class JMSConnectionFactoryHandlerForTest extends JMSConnectionFactoryHandler {
private Map<String, Object> configuredProperties = new HashMap<>();
public JMSConnectionFactoryHandlerForTest(ConfigurationContext context, ComponentLog logger) {
super(context, logger);
}
@Override
void setProperty(String propertyName, Object propertyValue) {
configuredProperties.put(propertyName, propertyValue);
}
public Map<String, Object> getConfiguredProperties() {
return configuredProperties;
}
}

View File

@ -18,32 +18,22 @@ package org.apache.nifi.jms.cf;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.controller.ConfigurationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Sub-class of {@link JMSConnectionFactoryProvider} only for testing purpose
*/
public class JMSConnectionFactoryProviderForTest extends JMSConnectionFactoryProvider {
private static Logger logger = LoggerFactory.getLogger(JMSConnectionFactoryProviderForTest.class);
private Map<String, Object> setProperties = new HashMap<>();
@OnEnabled
@Override
public void enable(ConfigurationContext context) {
setConnectionFactoryProperties(context);
public void onEnabled(ConfigurationContext context) {
delegate = new JMSConnectionFactoryHandlerForTest(context, getLogger());
delegate.setConnectionFactoryProperties();
}
@Override
void setProperty(String propertyName, Object propertyValue) {
setProperties.put(propertyName, propertyValue);
}
public Map<String, Object> getSetProperties() {
return setProperties;
public Map<String, Object> getConfiguredProperties() {
return ((JMSConnectionFactoryHandlerForTest) delegate).getConfiguredProperties();
}
}

View File

@ -17,8 +17,11 @@
package org.apache.nifi.jms.cf;
import com.google.common.collect.ImmutableMap;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -28,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@ -84,9 +88,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "foo");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, "foo");
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertNotValid(cfProvider);
}
@ -102,9 +106,9 @@ public class JMSConnectionFactoryProviderTest {
runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
runner.setVariable("client.lib", dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -120,9 +124,9 @@ public class JMSConnectionFactoryProviderTest {
runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
runner.setVariable("client.lib", allDummyResources);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, "${broker.uri}");
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, "${client.lib}");
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
@ -134,7 +138,14 @@ public class JMSConnectionFactoryProviderTest {
@Test(expected = IllegalStateException.class)
public void validateGetConnectionFactoryFailureIfServiceNotConfigured() {
new JMSConnectionFactoryProvider().getConnectionFactory();
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider() {
@Override
protected ComponentLog getLogger() {
return new MockComponentLog("cfProvider", this);
}
};
cfProvider.onEnabled(new MockConfigurationContext(Collections.emptyMap(), null));
cfProvider.getConnectionFactory();
}
@Test
@ -144,9 +155,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -158,9 +169,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -172,9 +183,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -186,9 +197,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -200,9 +211,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -214,9 +225,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -228,9 +239,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -242,9 +253,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -256,9 +267,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -270,9 +281,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -284,9 +295,9 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.assertValid(cfProvider);
}
@ -298,13 +309,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("hostName", HOSTNAME, "port", PORT));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("hostName", HOSTNAME, "port", PORT));
}
@Test
@ -314,13 +325,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of());
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of());
}
@Test
@ -330,13 +341,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("hostName", "myhost01", "port", "1234"));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("hostName", "myhost01", "port", "1234"));
}
@Test
@ -346,13 +357,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("brokerURL", SINGLE_ACTIVEMQ_BROKER));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("brokerURL", SINGLE_ACTIVEMQ_BROKER));
}
@Test
@ -362,13 +373,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, ACTIVEMQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("brokerURL", MULTIPLE_ACTIVEMQ_BROKERS));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("brokerURL", MULTIPLE_ACTIVEMQ_BROKERS));
}
@Test
@ -378,13 +389,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("serverUrl", SINGLE_TIBCO_BROKER));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("serverUrl", SINGLE_TIBCO_BROKER));
}
@Test
@ -394,13 +405,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TIBCO_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("serverUrl", MULTIPLE_TIBCO_BROKERS));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("serverUrl", MULTIPLE_TIBCO_BROKERS));
}
@Test
@ -410,13 +421,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", SINGLE_IBM_MQ_BROKER));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", SINGLE_IBM_MQ_BROKER));
}
@Test
@ -426,13 +437,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
}
@Test
@ -442,13 +453,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
}
@Test
@ -458,13 +469,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
}
@Test
@ -474,13 +485,13 @@ public class JMSConnectionFactoryProviderTest {
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
runner.addControllerService(controllerServiceId, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, IBM_MQ_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("connectionNameList", HOSTNAME + "(" + PORT + ")"));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", HOSTNAME + "(" + PORT + ")"));
}
@Test
@ -492,13 +503,13 @@ public class JMSConnectionFactoryProviderTest {
runner.setVariable("test", "dynamicValue");
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CLIENT_LIB_DIR_PATH, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProvider.CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, TEST_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, "dynamicProperty", "${test}");
runner.enableControllerService(cfProvider);
assertEquals(cfProvider.getSetProperties(), ImmutableMap.of("dynamicProperty", "dynamicValue", "hostName", HOSTNAME, "port", PORT));
assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("dynamicProperty", "dynamicValue", "hostName", HOSTNAME, "port", PORT));
}
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProvider;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.jms.Connection;
import javax.jms.JMSException;
/**
* Tests for the different Connection Factory configurations of {@link PublishJMS} and {@link ConsumeJMS}:
* - JndiJmsConnectionFactoryProvider controller service
* - JMSConnectionFactoryProvider controller service
* - local JndiJmsConnectionFactory configuration on the processor
* - local JMSConnectionFactory configuration on the processor
*/
public class ConnectionFactoryConfigIT {
private static final String CONTROLLER_SERVICE_ID = "cfProvider";
private static final String BROKER_URL = "vm://test-broker?broker.persistent=false";
private static final String PROP_JNDI_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
private static final String PROP_JNDI_PROVIDER_URL = BROKER_URL;
private static final String PROP_JNDI_CONNECTION_FACTORY_NAME = "ConnectionFactory";
private static final String PROP_JMS_CONNECTION_FACTORY_IMPL = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String PROP_JMS_BROKER_URI = BROKER_URL;
private static final String TEST_MESSAGE = "test-message";
private static Connection bootstrapConnection;
private TestRunner publisher;
private TestRunner consumer;
@BeforeClass
public static void beforeClass() throws JMSException {
// start in-VM broker
bootstrapConnection = new ActiveMQConnectionFactory(BROKER_URL).createConnection();
}
@AfterClass
public static void afterClass() throws JMSException {
// stop in-VM broker
bootstrapConnection.close();
}
@Before
public void before() {
publisher = TestRunners.newTestRunner(PublishJMS.class);
consumer = TestRunners.newTestRunner(ConsumeJMS.class);
}
@Test
public void testJndiJmsConnectionFactoryControllerService() throws InitializationException {
String queueName = "queue-jndi-service";
configureJndiJmsConnectionFactoryControllerService(publisher, queueName);
configureJndiJmsConnectionFactoryControllerService(consumer, queueName);
executeProcessors();
assertResult();
}
@Test
public void testJMSConnectionFactoryControllerService() throws InitializationException {
String queueName = "queue-jms-service";
configureJMSConnectionFactoryControllerService(publisher, queueName);
configureJMSConnectionFactoryControllerService(consumer, queueName);
executeProcessors();
assertResult();
}
@Test
public void testLocalJndiJmsConnectionFactoryConfig() {
String queueName = "queue-jndi-local";
configureLocalJndiJmsConnectionFactory(publisher, queueName);
configureLocalJndiJmsConnectionFactory(consumer, queueName);
executeProcessors();
assertResult();
}
@Test
public void testLocalJMSConnectionFactoryConfig() {
String queueName = "queue-jms-local";
configureLocalJMSConnectionFactory(publisher, queueName);
configureLocalJMSConnectionFactory(consumer, queueName);
executeProcessors();
assertResult();
}
private void configureJndiJmsConnectionFactoryControllerService(TestRunner runner, String queueName) throws InitializationException {
JndiJmsConnectionFactoryProvider cfProvider = new JndiJmsConnectionFactoryProvider();
runner.addControllerService(CONTROLLER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, PROP_JNDI_INITIAL_CONTEXT_FACTORY);
runner.setProperty(cfProvider, JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, PROP_JNDI_PROVIDER_URL);
runner.setProperty(cfProvider, JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, PROP_JNDI_CONNECTION_FACTORY_NAME);
runner.enableControllerService(cfProvider);
runner.setProperty(AbstractJMSProcessor.CF_SERVICE, CONTROLLER_SERVICE_ID);
runner.setProperty(AbstractJMSProcessor.DESTINATION, queueName);
}
private void configureJMSConnectionFactoryControllerService(TestRunner runner, String queueName) throws InitializationException {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(CONTROLLER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, PROP_JMS_CONNECTION_FACTORY_IMPL);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, PROP_JMS_BROKER_URI);
runner.enableControllerService(cfProvider);
runner.setProperty(AbstractJMSProcessor.CF_SERVICE, CONTROLLER_SERVICE_ID);
runner.setProperty(AbstractJMSProcessor.DESTINATION, queueName);
}
private void configureLocalJndiJmsConnectionFactory(TestRunner runner, String queueName) {
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, PROP_JNDI_INITIAL_CONTEXT_FACTORY);
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, PROP_JNDI_PROVIDER_URL);
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, PROP_JNDI_CONNECTION_FACTORY_NAME);
runner.setProperty(AbstractJMSProcessor.DESTINATION, queueName);
}
private void configureLocalJMSConnectionFactory(TestRunner runner, String queueName) {
runner.setProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, PROP_JMS_CONNECTION_FACTORY_IMPL);
runner.setProperty(JMSConnectionFactoryProperties.JMS_BROKER_URI, PROP_JMS_BROKER_URI);
runner.setProperty(AbstractJMSProcessor.DESTINATION, queueName);
}
private void executeProcessors() {
publisher.enqueue(TEST_MESSAGE);
publisher.run();
consumer.run();
}
private void assertResult() {
publisher.assertAllFlowFilesTransferred(PublishJMS.REL_SUCCESS, 1);
publisher.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS).get(0).assertContentEquals(TEST_MESSAGE);
consumer.assertAllFlowFilesTransferred(ConsumeJMS.REL_SUCCESS, 1);
consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS).get(0).assertContentEquals(TEST_MESSAGE);
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
/**
* Tests for {@link AbstractJMSProcessor.ConnectionFactoryConfigValidator}
*/
public class ConnectionFactoryConfigValidatorTest {
private static final String CONTROLLER_SERVICE_ID = "cfProvider";
private static final String PROP_JNDI_INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
private static final String PROP_JNDI_PROVIDER_URL = "tcp://myhost:61616";
private static final String PROP_JNDI_CONNECTION_FACTORY_NAME = "ConnectionFactory";
private static final String PROP_JMS_CONNECTION_FACTORY_IMPL = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String PROP_JMS_BROKER_URI = "tcp://myhost:61616";
private TestRunner runner;
@Before
public void setUp() {
runner = TestRunners.newTestRunner(PublishJMS.class);
runner.setProperty(PublishJMS.DESTINATION, "myQueue");
}
@Test
public void testNotValidWhenNoConnectionFactoryConfigured() {
runner.assertNotValid();
}
@Test
public void testValidControllerServiceConfig() throws InitializationException {
configureControllerService();
runner.assertValid();
}
@Test
public void testNotValidWhenControllerServiceConfiguredButLocalJndiJmsConnectionFactoryPropertyAlsoSpecified() throws InitializationException {
configureControllerService();
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, PROP_JNDI_CONNECTION_FACTORY_NAME);
runner.assertNotValid();
}
@Test
public void testNotValidWhenControllerServiceConfiguredButLocalJMSConnectionFactoryPropertyAlsoSpecified() throws InitializationException {
configureControllerService();
runner.setProperty(JMSConnectionFactoryProperties.JMS_BROKER_URI, PROP_JMS_BROKER_URI);
runner.assertNotValid();
}
@Test
public void testValidLocalJndiJmsConnectionFactoryConfig() {
configureLocalJndiJmsConnectionFactory();
runner.assertValid();
}
@Test
public void testNotValidWhenLocalJndiJmsConnectionFactoryConfiguredButLocalJMSConnectionFactoryPropertyAlsoSpecified() {
configureLocalJndiJmsConnectionFactory();
runner.setProperty(JMSConnectionFactoryProperties.JMS_BROKER_URI, PROP_JMS_BROKER_URI);
runner.assertNotValid();
}
@Test
public void testNotValidWhenNoProviderUrlSpecifiedForLocalJndiJmsConnectionFactory() {
configureLocalJndiJmsConnectionFactory();
runner.removeProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL);
runner.assertNotValid();
}
@Test
public void testNotValidWhenNoConnectionFactoryNameSpecifiedForLocalJndiJmsConnectionFactory() {
configureLocalJndiJmsConnectionFactory();
runner.removeProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME);
runner.assertNotValid();
}
@Test
public void testValidLocalJMSConnectionFactory() {
configureLocalJMSConnectionFactory();
runner.assertValid();
}
@Test
public void testNotValidWhenLocalJMSConnectionFactoryConfiguredButLocalJndiJmsConnectionFactoryPropertyAlsoSpecified() {
configureLocalJMSConnectionFactory();
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, PROP_JNDI_CONNECTION_FACTORY_NAME);
runner.assertNotValid();
}
private void configureControllerService() throws InitializationException {
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
runner.addControllerService(CONTROLLER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, PROP_JMS_CONNECTION_FACTORY_IMPL);
runner.enableControllerService(cfProvider);
runner.setProperty(AbstractJMSProcessor.CF_SERVICE, CONTROLLER_SERVICE_ID);
}
private void configureLocalJndiJmsConnectionFactory() {
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_INITIAL_CONTEXT_FACTORY, PROP_JNDI_INITIAL_CONTEXT_FACTORY);
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_PROVIDER_URL, PROP_JNDI_PROVIDER_URL);
runner.setProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME, PROP_JNDI_CONNECTION_FACTORY_NAME);
}
private void configureLocalJMSConnectionFactory() {
runner.setProperty(JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, PROP_JMS_CONNECTION_FACTORY_IMPL);
}
}