NIFI-4834: Updated AbstractJMSProcessor to use a separate SingleConnectionFactory per concurrent task instead of sharing one across the entire processor.

This closes #2445.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Mark Payne 2018-01-31 11:50:42 -05:00 committed by Andy LoPresto
parent b5938062a8
commit 3ca7c3e7a1
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
10 changed files with 420 additions and 408 deletions

View File

@ -18,9 +18,13 @@ package org.apache.nifi.jms.processors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
@ -38,9 +42,7 @@ import org.springframework.jms.core.JmsTemplate;
/**
* Base JMS processor to support implementation of JMS producers and consumers.
*
* @param <T>
* the type of {@link JMSWorker} which could be {@link JMSPublisher}
* or {@link JMSConsumer}
* @param <T> the type of {@link JMSWorker} which could be {@link JMSPublisher} or {@link JMSConsumer}
* @see PublishJMS
* @see ConsumeJMS
* @see JMSConnectionFactoryProviderDefinition
@ -48,7 +50,6 @@ import org.springframework.jms.core.JmsTemplate;
abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor {
static final String QUEUE = "QUEUE";
static final String TOPIC = "TOPIC";
static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
@ -90,14 +91,13 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.build();
static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Session Cache size")
.description("The maximum limit for the number of cached Sessions.")
.required(true)
.description("This property is deprecated and no longer has any effect on the Processor. It will be removed in a later version.")
.required(false)
.defaultValue("1")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
// ConnectionFactoryProvider ControllerService
static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder()
.name("Connection Factory Service")
.description("The Controller Service that is used to obtain ConnectionFactory")
@ -106,11 +106,9 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.build();
static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
private volatile BlockingQueue<T> workerPool;
private final AtomicInteger clientIdCounter = new AtomicInteger(1);
/*
* Will ensure that list of PropertyDescriptors is build only once, since
* all other lifecycle methods are invoked multiple times.
*/
static {
propertyDescriptors.add(CF_SERVICE);
propertyDescriptors.add(DESTINATION);
@ -121,47 +119,35 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
propertyDescriptors.add(SESSION_CACHE_SIZE);
}
protected volatile T targetResource;
private volatile CachingConnectionFactory cachingConnectionFactory;
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
/**
* Builds target resource ({@link JMSPublisher} or {@link JMSConsumer}) upon
* first invocation while delegating to the sub-classes ( {@link PublishJMS}
* or {@link ConsumeJMS}) via
* {@link #rendezvousWithJms(ProcessContext, ProcessSession)} method.
*/
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
synchronized (this) {
this.buildTargetResource(context);
}
this.rendezvousWithJms(context, session);
T worker = workerPool.poll();
if (worker == null) {
worker = buildTargetResource(context);
}
/**
* Will destroy the instance of {@link CachingConnectionFactory} and sets
* 'targetResource' to null;
*/
rendezvousWithJms(context, session, worker);
workerPool.offer(worker);
}
@OnScheduled
public void setupWorkerPool(final ProcessContext context) {
workerPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
}
@OnStopped
public void close() {
if (this.cachingConnectionFactory != null) {
this.cachingConnectionFactory.destroy();
T worker;
while ((worker = workerPool.poll()) != null) {
worker.shutdown();
}
this.targetResource = null;
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " - " + this.targetResource;
}
/**
@ -169,23 +155,16 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
* {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is
* implemented by sub-classes to perform {@link Processor} specific
* functionality.
*
* @param context
* instance of {@link ProcessContext}
* @param session
* instance of {@link ProcessSession}
*/
protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session) throws ProcessException;
protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session, T jmsWorker) throws ProcessException;
/**
* Finishes building one of the {@link JMSWorker} subclasses T.
*
* @param jmsTemplate instance of {@link JmsTemplate}
*
* @see JMSPublisher
* @see JMSConsumer
*/
protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext);
protected abstract T finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext);
/**
* This method essentially performs initialization of this Processor by
@ -195,30 +174,30 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
* in an instance of the {@link CachingConnectionFactory} used to construct
* {@link JmsTemplate} used by this Processor.
*/
private void buildTargetResource(ProcessContext context) {
if (this.targetResource == null) {
JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
ConnectionFactory connectionFactory = cfProvider.getConnectionFactory();
private T buildTargetResource(ProcessContext context) {
final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
final ConnectionFactory connectionFactory = cfProvider.getConnectionFactory();
UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter();
final UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter();
cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory);
cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue());
cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue());
this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter);
this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue()));
final CachingConnectionFactory cachingFactory = new CachingConnectionFactory(cfCredentialsAdapter);
String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
if (clientId != null) {
this.cachingConnectionFactory.setClientId(clientId);
clientId = clientId + "-" + clientIdCounter.getAndIncrement();
cachingFactory.setClientId(clientId);
}
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
jmsTemplate.setConnectionFactory(cachingFactory);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration
jmsTemplate.setReceiveTimeout(1000);
this.targetResource = this.finishBuildingTargetResource(jmsTemplate, context);
}
return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.jms.processors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -44,8 +42,8 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
/**
@ -146,35 +144,34 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
* 'success' {@link Relationship}.
*/
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean durable = durableBoolean == null ? false : durableBoolean;
final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
this.targetResource.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback(){
consumer.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response != null){
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(response.getMessageBody());
if (response == null) {
return;
}
});
Map<String, Object> jmsHeaders = response.getMessageHeaders();
Map<String, Object> jmsProperties = Collections.<String, Object>unmodifiableMap(response.getMessageProperties());
FlowFile flowFile = processSession.create();
flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody()));
final Map<String, String> jmsHeaders = response.getMessageHeaders();
final Map<String, String> jmsProperties = response.getMessageProperties();
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession);
flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession);
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
} else {
context.yield();
}
}
});
}
@ -183,23 +180,17 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
* Will create an instance of {@link JMSConsumer}
*/
@Override
protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) {
protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
jmsTemplate.setSessionAcknowledgeMode(ackMode);
return new JMSConsumer(jmsTemplate, this.getLogger());
return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger());
}
/**
*
*/
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
/**
*
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return thisPropertyDescriptors;
@ -211,11 +202,12 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
* copied values of JMS attributes will be "stringified" via
* String.valueOf(attribute).
*/
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, Object> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<String, String>();
for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
attributes.put(entry.getKey(), String.valueOf(entry.getValue()));
private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, String> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
Map<String, String> attributes = new HashMap<>();
for (Entry<String, String> entry : jmsAttributes.entrySet()) {
attributes.put(entry.getKey(), entry.getValue());
}
flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile;
}

View File

@ -33,6 +33,7 @@ import javax.jms.Topic;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.JmsHeaders;
@ -43,63 +44,54 @@ import org.springframework.jms.support.JmsUtils;
*/
final class JMSConsumer extends JMSWorker {
/**
* Creates an instance of this consumer
*
* @param jmsTemplate
* instance of {@link JmsTemplate}
* @param processLog
* instance of {@link ComponentLog}
*/
JMSConsumer(JmsTemplate jmsTemplate, ComponentLog processLog) {
super(jmsTemplate, processLog);
if (this.processLog.isInfoEnabled()) {
this.processLog.info("Created Message Consumer for '" + jmsTemplate.toString() + "'.");
}
JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) {
super(connectionFactory, jmsTemplate, logger);
logger.debug("Created Message Consumer for '{}'", new Object[] {jmsTemplate});
}
/**
*
*/
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(Session session) throws JMSException {
/*
* We need to call recover to ensure that in in the event of
* abrupt end or exception the current session will stop message
* delivery and restarts with the oldest unacknowledged message
*/
session.recover();
boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
session, destinationName, isPubSub);
MessageConsumer msgConsumer;
private MessageConsumer createMessageConsumer(final Session session, final String destinationName, final boolean durable, final boolean shared, final String subscriberName) throws JMSException {
final boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain();
final Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub);
if (isPubSub) {
if (shared) {
try {
if (durable) {
msgConsumer = session.createSharedDurableConsumer((Topic)destination, subscriberName);
return session.createSharedDurableConsumer((Topic) destination, subscriberName);
} else {
msgConsumer = session.createSharedConsumer((Topic)destination, subscriberName);
return session.createSharedConsumer((Topic) destination, subscriberName);
}
} catch (AbstractMethodError e) {
throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
}
} else {
if (durable) {
msgConsumer = session.createDurableConsumer((Topic)destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createDurableConsumer((Topic) destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
} else {
msgConsumer = session.createConsumer((Topic)destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
}
} else {
msgConsumer = session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain());
}
Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
}
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
public Void doInJms(final Session session) throws JMSException {
// We need to call recover to ensure that in in the event of
// abrupt end or exception the current session will stop message
// delivery and restarts with the oldest unacknowledged message
session.recover();
final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName);
try {
final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
if (message != null) {
byte[] messageBody = null;
if (message instanceof TextMessage) {
@ -110,10 +102,12 @@ final class JMSConsumer extends JMSWorker {
throw new IllegalStateException("Message type other then TextMessage and BytesMessage are "
+ "not supported at the moment");
}
Map<String, Object> messageHeaders = extractMessageHeaders(message);
Map<String, String> messageProperties = extractMessageProperties(message);
final Map<String, String> messageHeaders = extractMessageHeaders(message);
final Map<String, String> messageProperties = extractMessageProperties(message);
response = new JMSResponse(messageBody, messageHeaders, messageProperties);
}
// invoke the processor callback (regardless if it's null,
// so the processor can yield) as part of this inJMS call
// and ACK message *only* after its successful invocation
@ -125,19 +119,18 @@ final class JMSConsumer extends JMSWorker {
} finally {
JmsUtils.closeMessageConsumer(msgConsumer);
}
return null;
}
}, true);
}
/**
*
*/
@SuppressWarnings("unchecked")
private Map<String, String> extractMessageProperties(Message message) {
Map<String, String> properties = new HashMap<>();
private Map<String, String> extractMessageProperties(final Message message) {
final Map<String, String> properties = new HashMap<>();
try {
Enumeration<String> propertyNames = message.getPropertyNames();
final Enumeration<String> propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = propertyNames.nextElement();
properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName)));
@ -148,15 +141,10 @@ final class JMSConsumer extends JMSWorker {
return properties;
}
/**
*
*
*/
private Map<String, Object> extractMessageHeaders(Message message) {
// even though all values are Strings in current impl, it may change in
// the future, so keeping it <String, Object>
Map<String, Object> messageHeaders = new HashMap<>();
try {
private Map<String, String> extractMessageHeaders(final Message message) throws JMSException {
final Map<String, String> messageHeaders = new HashMap<>();
messageHeaders.put(JmsHeaders.DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode()));
messageHeaders.put(JmsHeaders.EXPIRATION, String.valueOf(message.getJMSExpiration()));
messageHeaders.put(JmsHeaders.PRIORITY, String.valueOf(message.getJMSPriority()));
@ -170,19 +158,16 @@ final class JMSConsumer extends JMSWorker {
if (replyToDestinationName != null) {
messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName);
}
String destinationName = this.retrieveDestinationName(message.getJMSDestination(), JmsHeaders.DESTINATION);
if (destinationName != null) {
messageHeaders.put(JmsHeaders.DESTINATION, destinationName);
}
} catch (Exception e) {
throw new IllegalStateException("Failed to extract JMS Headers", e);
}
return messageHeaders;
}
/**
*
*/
private String retrieveDestinationName(Destination destination, String headerName) {
String destinationName = null;
if (destination != null) {
@ -196,17 +181,14 @@ final class JMSConsumer extends JMSWorker {
return destinationName;
}
/**
*
*/
static class JMSResponse {
private final byte[] messageBody;
private final Map<String, Object> messageHeaders;
private final Map<String, String> messageHeaders;
private final Map<String, String> messageProperties;
JMSResponse(byte[] messageBody, Map<String, Object> messageHeaders, Map<String, String> messageProperties) {
JMSResponse(byte[] messageBody, Map<String, String> messageHeaders, Map<String, String> messageProperties) {
this.messageBody = messageBody;
this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
this.messageProperties = Collections.unmodifiableMap(messageProperties);
@ -216,7 +198,7 @@ final class JMSConsumer extends JMSWorker {
return this.messageBody;
}
public Map<String, Object> getMessageHeaders() {
public Map<String, String> getMessageHeaders() {
return this.messageHeaders;
}

View File

@ -27,10 +27,8 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.SessionCallback;
@ -41,45 +39,22 @@ import org.springframework.jms.support.JmsHeaders;
*/
final class JMSPublisher extends JMSWorker {
private final static Logger logger = LoggerFactory.getLogger(JMSPublisher.class);
/**
* Creates an instance of this publisher
*
* @param jmsTemplate
* instance of {@link JmsTemplate}
* @param processLog
* instance of {@link ComponentLog}
*/
JMSPublisher(JmsTemplate jmsTemplate, ComponentLog processLog) {
super(jmsTemplate, processLog);
if (logger.isInfoEnabled()) {
logger.info("Created Message Publisher for '" + jmsTemplate.toString() + "'.");
}
JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) {
super(connectionFactory, jmsTemplate, processLog);
processLog.debug("Created Message Publisher for {}", new Object[] {jmsTemplate});
}
/**
*
* @param messageBytes byte array representing contents of the message
*/
void publish(String destinationName, byte[] messageBytes) {
this.publish(destinationName, messageBytes, null);
}
/**
*
* @param messageBytes
* byte array representing contents of the message
* @param flowFileAttributes
* Map representing {@link FlowFile} attributes.
*/
void publish(final String destinationName, final byte[] messageBytes, final Map<String, String> flowFileAttributes) {
this.jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage message = session.createBytesMessage();
message.writeBytes(messageBytes);
if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) {
// set message headers and properties
for (Entry<String, String> entry : flowFileAttributes.entrySet()) {
@ -121,18 +96,12 @@ final class JMSPublisher extends JMSWorker {
});
}
/**
*
*/
private void logUnbuildableDestination(String destinationName, String headerName) {
this.processLog.warn("Failed to determine destination type from destination name '" + destinationName
+ "'. The '"
+ headerName + "' will not be set.");
this.processLog.warn("Failed to determine destination type from destination name '{}'. The '{}' header will not be set.", new Object[] {destinationName, headerName});
}
/**
*
*/
private Destination buildDestination(final String destinationName) {
Destination destination;
if (destinationName.toLowerCase().contains("topic")) {
@ -152,6 +121,7 @@ final class JMSPublisher extends JMSWorker {
} else {
destination = null;
}
return destination;
}
}

View File

@ -21,6 +21,7 @@ import java.nio.channels.Channel;
import javax.jms.Connection;
import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@ -33,8 +34,9 @@ import org.springframework.jms.core.JmsTemplate;
abstract class JMSWorker {
protected final JmsTemplate jmsTemplate;
protected final ComponentLog processLog;
private final CachingConnectionFactory connectionFactory;
/**
* Creates an instance of this worker initializing it with JMS
@ -44,14 +46,16 @@ abstract class JMSWorker {
* @param jmsTemplate the instance of {@link JmsTemplate}
* @param processLog the instance of {@link ComponentLog}
*/
public JMSWorker(JmsTemplate jmsTemplate, ComponentLog processLog) {
public JMSWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) {
this.connectionFactory = connectionFactory;
this.jmsTemplate = jmsTemplate;
this.processLog = processLog;
}
/**
*
*/
public void shutdown() {
connectionFactory.destroy();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName()

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.jms.processors;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -37,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
@ -91,20 +89,19 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
* Upon success the incoming {@link FlowFile} is transferred to the'success'
* {@link Relationship} and upon failure FlowFile is penalized and
* transferred to the 'failure' {@link Relationship}
*
*/
@Override
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException {
protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException {
FlowFile flowFile = processSession.get();
if (flowFile != null) {
try {
String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue();
this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
} catch (Exception e) {
processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e);
this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
context.yield();
}
}
@ -122,8 +119,8 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
* Will create an instance of {@link JMSPublisher}
*/
@Override
protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) {
return new JMSPublisher(jmsTemplate, this.getLogger());
protected JMSPublisher finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger());
}
/**
@ -131,12 +128,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
*/
private byte[] extractMessageBody(FlowFile flowFile, ProcessSession session) {
final byte[] messageContent = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, messageContent, true);
}
});
session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true));
return messageContent;
}
}

View File

@ -52,13 +52,13 @@ public class CommonTest {
}
static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
connectionFactory = new CachingConnectionFactory(connectionFactory);
ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setPubSubDomain(pubSub);
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
jmsTemplate.setReceiveTimeout(10L);
return jmsTemplate;
}
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
@ -29,17 +33,14 @@ import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsHeaders;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConsumeJMSTest {
@Test
public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception {
final String destinationName = "cooQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
try {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final Map<String, String> senderAttributes = new HashMap<>();
senderAttributes.put("filename", "message.txt");
senderAttributes.put("attribute_from_sender", "some value");
@ -67,7 +68,8 @@ public class ConsumeJMSTest {
successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
}

View File

@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
@ -48,7 +50,8 @@ public class JMSPublisherConsumerTest {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "hellomq".getBytes());
Message receivedMessage = jmsTemplate.receive(destinationName);
@ -56,16 +59,18 @@ public class JMSPublisherConsumerTest {
byte[] bytes = new byte[7];
((BytesMessage) receivedMessage).readBytes(bytes);
assertEquals("hellomq", new String(bytes));
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put("foo", "foo");
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
@ -77,8 +82,10 @@ public class JMSPublisherConsumerTest {
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
/**
* At the moment the only two supported message types are TextMessage and
@ -91,6 +98,7 @@ public class JMSPublisherConsumerTest {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
@ -98,8 +106,7 @@ public class JMSPublisherConsumerTest {
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
try {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
@ -116,6 +123,7 @@ public class JMSPublisherConsumerTest {
final String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
jmsTemplate.send(destinationName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
@ -127,7 +135,7 @@ public class JMSPublisherConsumerTest {
}
});
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override
@ -141,18 +149,79 @@ public class JMSPublisherConsumerTest {
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
@Test(timeout = 2000000)
public void testMultipleThreads() throws Exception {
String destinationName = "testQueue";
JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false);
final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(4);
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class));
for (int i = 0; i < 4000; i++) {
publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8));
}
final AtomicInteger msgCount = new AtomicInteger(0);
final ConsumerCallback callback = new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
msgCount.incrementAndGet();
}
};
final Thread[] threads = new Thread[4];
for (int i = 0; i < 4; i++) {
final Thread t = new Thread(() -> {
JmsTemplate consumeTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, false, false, null, callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
consumerTemplateCloseCount.countDown();
}
});
threads[i] = t;
t.start();
}
int iterations = 0;
while (msgCount.get() < 4000) {
Thread.sleep(10L);
if (++iterations % 100 == 0) {
System.out.println(msgCount.get() + " messages received so far");
}
}
} finally {
((CachingConnectionFactory) publishTemplate.getConnectionFactory()).destroy();
consumerTemplateCloseCount.await();
}
}
@Test(timeout = 10000)
public void validateMessageRedeliveryWhenNotAcked() throws Exception {
String destinationName = "testQueue";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class));
try {
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8));
publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8));
JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class));
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@ -164,36 +233,46 @@ public class JMSPublisherConsumerTest {
}
});
} catch (Exception e) {
// ignore
// expected
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("1", new String(response.getMessageBody()));
}
});
} catch (Exception e) {
// ignore
}
assertTrue(callbackInvoked.get());
callbackInvoked.set(false);
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
throw new RuntimeException("intentional to avoid explicit ack");
}
});
}
} catch (Exception e) {
// ignore
}
@ -202,16 +281,24 @@ public class JMSPublisherConsumerTest {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
return;
}
callbackInvoked.set(true);
assertEquals("2", new String(response.getMessageBody()));
}
});
}
} catch (Exception e) {
// ignore
}
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
}

View File

@ -39,7 +39,7 @@ import static org.mockito.Mockito.when;
public class PublishJMSTest {
@Test
@Test(timeout = 10000)
public void validateSuccessfulPublishAndTransferToSuccess() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
@ -60,7 +60,7 @@ public class PublishJMSTest {
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
@ -72,6 +72,8 @@ public class PublishJMSTest {
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
runner.run(1, true); // Run once just so that we can trigger the shutdown of the Connection Factory
}
@Test
@ -96,7 +98,7 @@ public class PublishJMSTest {
attributes.put("foo", "foo");
attributes.put(JmsHeaders.REPLY_TO, "cooQueue");
runner.enqueue("Hey dude!".getBytes(), attributes);
runner.run(1, false);
runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it.
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
@ -108,6 +110,8 @@ public class PublishJMSTest {
assertEquals("Hey dude!", new String(messageBytes));
assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName());
assertEquals("foo", message.getStringProperty("foo"));
runner.run(1, true); // Run once just so that we can trigger the shutdown of the Connection Factory
}
@Test