NIFI-5741: When returning a ConnectionFactory from the JndiJmsConnectionFactoryProvider, ensure that we wrap the ConnectionFactory so that any calls to the ConnectionFactory happen within the context of the Controller Service's Class Loader

This closes #3106.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2018-10-23 14:42:32 -04:00 committed by Bryan Bende
parent 10479a5a2a
commit ebead820f9
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
5 changed files with 94 additions and 70 deletions

View File

@ -34,6 +34,9 @@ import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Hashtable;
import java.util.List;
@ -49,7 +52,7 @@ import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDA
value = "The value of the JNDI Initial Context Environment variable.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS", "org.apache.nifi.jms.cf.JMSConnectionFactoryProvider"})
public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition{
public class JndiJmsConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition {
static final PropertyDescriptor INITIAL_NAMING_FACTORY_CLASS = new Builder()
.name("java.naming.factory.initial")
@ -146,10 +149,37 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService
return connectionFactory;
}
private ConnectionFactory lookupConnectionFactory() {
try {
final ConfigurationContext context = getConfigurationContext();
final String factoryName = context.getProperty(CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
getLogger().debug("Looking up Connection Factory with name [{}]", new Object[] {factoryName});
final Context initialContext = createInitialContext();
final Object factoryObject = initialContext.lookup(factoryName);
getLogger().debug("Obtained {} from JNDI", new Object[] {factoryObject});
if (factoryObject == null) {
throw new ProcessException("Got a null Factory Object from JNDI");
}
if (!(factoryObject instanceof ConnectionFactory)) {
throw new ProcessException("Successfully performed JNDI lookup with Object Name [" + factoryName + "] but the returned object is not a ConnectionFactory. " +
"Instead, is of type " + factoryObject.getClass() + " : " + factoryObject);
}
return (ConnectionFactory) instrumentWithClassLoader(factoryObject, Thread.currentThread().getContextClassLoader(), ConnectionFactory.class);
} catch (final NamingException ne) {
throw new ProcessException("Could not obtain JMS Connection Factory from JNDI", ne);
}
}
private Context createInitialContext() throws NamingException {
final ConfigurationContext context = getConfigurationContext();
final Hashtable<String, String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY, context.getProperty(INITIAL_NAMING_FACTORY_CLASS).evaluateAttributeExpressions().getValue().trim());
env.put(Context.PROVIDER_URL, context.getProperty(NAMING_PROVIDER_URL).evaluateAttributeExpressions().getValue().trim());
@ -170,25 +200,27 @@ public class JndiJmsConnectionFactoryProvider extends AbstractControllerService
}
});
final String factoryName = context.getProperty(CONNECTION_FACTORY_NAME).evaluateAttributeExpressions().getValue().trim();
getLogger().debug("Looking up Connection Factory with name [{}] using JNDI Environment {}", new Object[] {factoryName, env});
getLogger().debug("Creating Initial Context using JNDI Environment {}", new Object[] {env});
final Context initialContext = new InitialContext(env);
final Object factoryObject = initialContext.lookup(factoryName);
getLogger().debug("Obtained {} from JNDI", new Object[] {factoryObject});
if (factoryObject == null) {
throw new ProcessException("Got a null Factory Object from JNDI");
}
if (!(factoryObject instanceof ConnectionFactory)) {
throw new ProcessException("Successfully performed JNDI lookup with Object Name [" + factoryName + "] but the returned object is not a ConnectionFactory. " +
"Instead, is of type " + factoryObject.getClass() + " : " + factoryObject);
return initialContext;
}
return (ConnectionFactory) factoryObject;
} catch (final NamingException ne) {
throw new ProcessException("Could not obtain JMS Connection Factory from JNDI", ne);
public static Object instrumentWithClassLoader(final Object obj, final ClassLoader classLoader, final Class<?>... interfaces) {
final InvocationHandler invocationHandler = new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
final Thread thread = Thread.currentThread();
final ClassLoader currentClassLoader = thread.getContextClassLoader();
try {
thread.setContextClassLoader(classLoader);
return method.invoke(obj, args);
} finally {
thread.setContextClassLoader(currentClassLoader);
}
}
};
return Proxy.newProxyInstance(classLoader, interfaces, invocationHandler);
}
}

View File

@ -16,15 +16,6 @@
*/
package org.apache.nifi.jms.processors;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -41,6 +32,14 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Base JMS processor to support implementation of JMS producers and consumers.
*

View File

@ -16,17 +16,6 @@
*/
package org.apache.nifi.jms.processors;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.jms.Session;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -50,6 +39,16 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* Consuming JMS processor which upon each invocation of
* {@link #onTrigger(ProcessContext, ProcessSession)} method will construct a

View File

@ -16,11 +16,12 @@
*/
package org.apache.nifi.jms.processors;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@ -30,13 +31,11 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
/**
* Generic publisher of messages to JMS compliant messaging system.
@ -64,10 +63,6 @@ final class JMSPublisher extends JMSWorker {
});
}
void publish(String destinationName, String messageText) {
this.publish(destinationName, messageText, null);
}
void publish(String destinationName, String messageText, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override

View File

@ -16,22 +16,13 @@
*/
package org.apache.nifi.jms.processors;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.jms.Destination;
import javax.jms.Message;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -47,6 +38,14 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.Destination;
import javax.jms.Message;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* An implementation of JMS Message publishing {@link Processor} which upon each
* invocation of {@link #onTrigger(ProcessContext, ProcessSession)} method will