diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 39118a75cb..2758bfe013 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -18,9 +18,13 @@ package org.apache.nifi.jms.processors; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.ConnectionFactory; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; @@ -38,9 +42,7 @@ import org.springframework.jms.core.JmsTemplate; /** * Base JMS processor to support implementation of JMS producers and consumers. * - * @param - * the type of {@link JMSWorker} which could be {@link JMSPublisher} - * or {@link JMSConsumer} + * @param the type of {@link JMSWorker} which could be {@link JMSPublisher} or {@link JMSConsumer} * @see PublishJMS * @see ConsumeJMS * @see JMSConnectionFactoryProviderDefinition @@ -48,7 +50,6 @@ import org.springframework.jms.core.JmsTemplate; abstract class AbstractJMSProcessor extends AbstractProcessor { static final String QUEUE = "QUEUE"; - static final String TOPIC = "TOPIC"; static final PropertyDescriptor USER = new PropertyDescriptor.Builder() @@ -90,14 +91,13 @@ abstract class AbstractJMSProcessor extends AbstractProcess .build(); static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() .name("Session Cache size") - .description("The maximum limit for the number of cached Sessions.") - .required(true) + .description("This property is deprecated and no longer has any effect on the Processor. It will be removed in a later version.") + .required(false) .defaultValue("1") .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); - // ConnectionFactoryProvider ControllerService static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() .name("Connection Factory Service") .description("The Controller Service that is used to obtain ConnectionFactory") @@ -106,11 +106,9 @@ abstract class AbstractJMSProcessor extends AbstractProcess .build(); static final List propertyDescriptors = new ArrayList<>(); + private volatile BlockingQueue workerPool; + private final AtomicInteger clientIdCounter = new AtomicInteger(1); - /* - * Will ensure that list of PropertyDescriptors is build only once, since - * all other lifecycle methods are invoked multiple times. - */ static { propertyDescriptors.add(CF_SERVICE); propertyDescriptors.add(DESTINATION); @@ -121,47 +119,35 @@ abstract class AbstractJMSProcessor extends AbstractProcess propertyDescriptors.add(SESSION_CACHE_SIZE); } - protected volatile T targetResource; - - private volatile CachingConnectionFactory cachingConnectionFactory; - - /** - * - */ @Override protected List getSupportedPropertyDescriptors() { return propertyDescriptors; } - /** - * Builds target resource ({@link JMSPublisher} or {@link JMSConsumer}) upon - * first invocation while delegating to the sub-classes ( {@link PublishJMS} - * or {@link ConsumeJMS}) via - * {@link #rendezvousWithJms(ProcessContext, ProcessSession)} method. - */ + @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - synchronized (this) { - this.buildTargetResource(context); + T worker = workerPool.poll(); + if (worker == null) { + worker = buildTargetResource(context); } - this.rendezvousWithJms(context, session); + + rendezvousWithJms(context, session, worker); + workerPool.offer(worker); } - /** - * Will destroy the instance of {@link CachingConnectionFactory} and sets - * 'targetResource' to null; - */ + @OnScheduled + public void setupWorkerPool(final ProcessContext context) { + workerPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + } + + @OnStopped public void close() { - if (this.cachingConnectionFactory != null) { - this.cachingConnectionFactory.destroy(); + T worker; + while ((worker = workerPool.poll()) != null) { + worker.shutdown(); } - this.targetResource = null; - } - - @Override - public String toString() { - return this.getClass().getSimpleName() + " - " + this.targetResource; } /** @@ -169,23 +155,16 @@ abstract class AbstractJMSProcessor extends AbstractProcess * {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is * implemented by sub-classes to perform {@link Processor} specific * functionality. - * - * @param context - * instance of {@link ProcessContext} - * @param session - * instance of {@link ProcessSession} */ - protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session) throws ProcessException; + protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session, T jmsWorker) throws ProcessException; /** * Finishes building one of the {@link JMSWorker} subclasses T. * - * @param jmsTemplate instance of {@link JmsTemplate} - * * @see JMSPublisher * @see JMSConsumer */ - protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext); + protected abstract T finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext); /** * This method essentially performs initialization of this Processor by @@ -195,30 +174,30 @@ abstract class AbstractJMSProcessor extends AbstractProcess * in an instance of the {@link CachingConnectionFactory} used to construct * {@link JmsTemplate} used by this Processor. */ - private void buildTargetResource(ProcessContext context) { - if (this.targetResource == null) { - JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); - ConnectionFactory connectionFactory = cfProvider.getConnectionFactory(); + private T buildTargetResource(ProcessContext context) { + final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); + final ConnectionFactory connectionFactory = cfProvider.getConnectionFactory(); - UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter(); - cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory); - cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue()); - cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue()); + final UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter(); + cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory); + cfCredentialsAdapter.setUsername(context.getProperty(USER).getValue()); + cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue()); - this.cachingConnectionFactory = new CachingConnectionFactory(cfCredentialsAdapter); - this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue())); - String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue(); - if (clientId != null) { - this.cachingConnectionFactory.setClientId(clientId); - } - JmsTemplate jmsTemplate = new JmsTemplate(); - jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); + final CachingConnectionFactory cachingFactory = new CachingConnectionFactory(cfCredentialsAdapter); - // set of properties that may be good candidates for exposure via configuration - jmsTemplate.setReceiveTimeout(1000); - - this.targetResource = this.finishBuildingTargetResource(jmsTemplate, context); + String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue(); + if (clientId != null) { + clientId = clientId + "-" + clientIdCounter.getAndIncrement(); + cachingFactory.setClientId(clientId); } + + JmsTemplate jmsTemplate = new JmsTemplate(); + jmsTemplate.setConnectionFactory(cachingFactory); + jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); + + // set of properties that may be good candidates for exposure via configuration + jmsTemplate.setReceiveTimeout(1000); + + return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 87743974c7..a199411a0b 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.jms.processors; -import java.io.IOException; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,8 +42,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; /** @@ -146,35 +144,34 @@ public class ConsumeJMS extends AbstractJMSProcessor { * 'success' {@link Relationship}. */ @Override - protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException { + protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException { final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); final boolean durable = durableBoolean == null ? false : durableBoolean; final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); final boolean shared = sharedBoolean == null ? false : sharedBoolean; final String subscriptionName = context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue(); - this.targetResource.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback(){ + + consumer.consume(destinationName, durable, shared, subscriptionName, new ConsumerCallback() { @Override public void accept(final JMSResponse response) { - if (response != null){ - FlowFile flowFile = processSession.create(); - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(response.getMessageBody()); - } - }); - Map jmsHeaders = response.getMessageHeaders(); - Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); - flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); - flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); - flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); - processSession.getProvenanceReporter().receive(flowFile, destinationName); - processSession.transfer(flowFile, REL_SUCCESS); - processSession.commit(); - } else { - context.yield(); + if (response == null) { + return; } + + FlowFile flowFile = processSession.create(); + flowFile = processSession.write(flowFile, out -> out.write(response.getMessageBody())); + + final Map jmsHeaders = response.getMessageHeaders(); + final Map jmsProperties = response.getMessageProperties(); + + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); + flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); + + processSession.getProvenanceReporter().receive(flowFile, destinationName); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); } }); } @@ -183,23 +180,17 @@ public class ConsumeJMS extends AbstractJMSProcessor { * Will create an instance of {@link JMSConsumer} */ @Override - protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) { + protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) { int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger(); jmsTemplate.setSessionAcknowledgeMode(ackMode); - return new JMSConsumer(jmsTemplate, this.getLogger()); + return new JMSConsumer(connectionFactory, jmsTemplate, this.getLogger()); } - /** - * - */ @Override public Set getRelationships() { return relationships; } - /** - * - */ @Override protected List getSupportedPropertyDescriptors() { return thisPropertyDescriptors; @@ -211,11 +202,12 @@ public class ConsumeJMS extends AbstractJMSProcessor { * copied values of JMS attributes will be "stringified" via * String.valueOf(attribute). */ - private FlowFile updateFlowFileAttributesWithJMSAttributes(Map jmsAttributes, FlowFile flowFile, ProcessSession processSession) { - Map attributes = new HashMap(); - for (Entry entry : jmsAttributes.entrySet()) { - attributes.put(entry.getKey(), String.valueOf(entry.getValue())); + private FlowFile updateFlowFileAttributesWithJMSAttributes(Map jmsAttributes, FlowFile flowFile, ProcessSession processSession) { + Map attributes = new HashMap<>(); + for (Entry entry : jmsAttributes.entrySet()) { + attributes.put(entry.getKey(), entry.getValue()); } + flowFile = processSession.putAllAttributes(flowFile, attributes); return flowFile; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index a4fc47a1c3..841b62df4e 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -33,6 +33,7 @@ import javax.jms.Topic; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsHeaders; @@ -43,63 +44,54 @@ import org.springframework.jms.support.JmsUtils; */ final class JMSConsumer extends JMSWorker { - /** - * Creates an instance of this consumer - * - * @param jmsTemplate - * instance of {@link JmsTemplate} - * @param processLog - * instance of {@link ComponentLog} - */ - JMSConsumer(JmsTemplate jmsTemplate, ComponentLog processLog) { - super(jmsTemplate, processLog); - if (this.processLog.isInfoEnabled()) { - this.processLog.info("Created Message Consumer for '" + jmsTemplate.toString() + "'."); + JMSConsumer(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog logger) { + super(connectionFactory, jmsTemplate, logger); + logger.debug("Created Message Consumer for '{}'", new Object[] {jmsTemplate}); + } + + + private MessageConsumer createMessageConsumer(final Session session, final String destinationName, final boolean durable, final boolean shared, final String subscriberName) throws JMSException { + final boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain(); + final Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, destinationName, isPubSub); + + if (isPubSub) { + if (shared) { + try { + if (durable) { + return session.createSharedDurableConsumer((Topic) destination, subscriberName); + } else { + return session.createSharedConsumer((Topic) destination, subscriberName); + } + } catch (AbstractMethodError e) { + throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e); + } + } else { + if (durable) { + return session.createDurableConsumer((Topic) destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + } else { + return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + } + } + } else { + return session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); } } - /** - * - */ public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final ConsumerCallback consumerCallback) { this.jmsTemplate.execute(new SessionCallback() { @Override - public Void doInJms(Session session) throws JMSException { - /* - * We need to call recover to ensure that in in the event of - * abrupt end or exception the current session will stop message - * delivery and restarts with the oldest unacknowledged message - */ + public Void doInJms(final Session session) throws JMSException { + // We need to call recover to ensure that in in the event of + // abrupt end or exception the current session will stop message + // delivery and restarts with the oldest unacknowledged message session.recover(); - boolean isPubSub = JMSConsumer.this.jmsTemplate.isPubSubDomain(); - Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName( - session, destinationName, isPubSub); - MessageConsumer msgConsumer; - if (isPubSub) { - if (shared) { - try { - if (durable) { - msgConsumer = session.createSharedDurableConsumer((Topic)destination, subscriberName); - } else { - msgConsumer = session.createSharedConsumer((Topic)destination, subscriberName); - } - } catch (AbstractMethodError e) { - throw new ProcessException("Failed to create a shared consumer. Make sure the target broker is JMS 2.0 compliant.", e); - } - } else { - if (durable) { - msgConsumer = session.createDurableConsumer((Topic)destination, subscriberName, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); - } else { - msgConsumer = session.createConsumer((Topic)destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); - } - } - } else { - msgConsumer = session.createConsumer(destination, null, JMSConsumer.this.jmsTemplate.isPubSubDomain()); - } - Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); - JMSResponse response = null; + + final MessageConsumer msgConsumer = createMessageConsumer(session, destinationName, durable, shared, subscriberName); try { + final Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); + JMSResponse response = null; + if (message != null) { byte[] messageBody = null; if (message instanceof TextMessage) { @@ -108,12 +100,14 @@ final class JMSConsumer extends JMSWorker { messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); } else { throw new IllegalStateException("Message type other then TextMessage and BytesMessage are " - + "not supported at the moment"); + + "not supported at the moment"); } - Map messageHeaders = extractMessageHeaders(message); - Map messageProperties = extractMessageProperties(message); + + final Map messageHeaders = extractMessageHeaders(message); + final Map messageProperties = extractMessageProperties(message); response = new JMSResponse(messageBody, messageHeaders, messageProperties); } + // invoke the processor callback (regardless if it's null, // so the processor can yield) as part of this inJMS call // and ACK message *only* after its successful invocation @@ -125,19 +119,18 @@ final class JMSConsumer extends JMSWorker { } finally { JmsUtils.closeMessageConsumer(msgConsumer); } + return null; } }, true); } - /** - * - */ + @SuppressWarnings("unchecked") - private Map extractMessageProperties(Message message) { - Map properties = new HashMap<>(); + private Map extractMessageProperties(final Message message) { + final Map properties = new HashMap<>(); try { - Enumeration propertyNames = message.getPropertyNames(); + final Enumeration propertyNames = message.getPropertyNames(); while (propertyNames.hasMoreElements()) { String propertyName = propertyNames.nextElement(); properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName))); @@ -148,41 +141,33 @@ final class JMSConsumer extends JMSWorker { return properties; } - /** - * - * - */ - private Map extractMessageHeaders(Message message) { - // even though all values are Strings in current impl, it may change in - // the future, so keeping it - Map messageHeaders = new HashMap<>(); - try { - messageHeaders.put(JmsHeaders.DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode())); - messageHeaders.put(JmsHeaders.EXPIRATION, String.valueOf(message.getJMSExpiration())); - messageHeaders.put(JmsHeaders.PRIORITY, String.valueOf(message.getJMSPriority())); - messageHeaders.put(JmsHeaders.REDELIVERED, String.valueOf(message.getJMSRedelivered())); - messageHeaders.put(JmsHeaders.TIMESTAMP, String.valueOf(message.getJMSTimestamp())); - messageHeaders.put(JmsHeaders.CORRELATION_ID, message.getJMSCorrelationID()); - messageHeaders.put(JmsHeaders.MESSAGE_ID, message.getJMSMessageID()); - messageHeaders.put(JmsHeaders.TYPE, message.getJMSType()); - String replyToDestinationName = this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO); - if (replyToDestinationName != null) { - messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName); - } - String destinationName = this.retrieveDestinationName(message.getJMSDestination(), JmsHeaders.DESTINATION); - if (destinationName != null) { - messageHeaders.put(JmsHeaders.DESTINATION, destinationName); - } - } catch (Exception e) { - throw new IllegalStateException("Failed to extract JMS Headers", e); + private Map extractMessageHeaders(final Message message) throws JMSException { + final Map messageHeaders = new HashMap<>(); + + messageHeaders.put(JmsHeaders.DELIVERY_MODE, String.valueOf(message.getJMSDeliveryMode())); + messageHeaders.put(JmsHeaders.EXPIRATION, String.valueOf(message.getJMSExpiration())); + messageHeaders.put(JmsHeaders.PRIORITY, String.valueOf(message.getJMSPriority())); + messageHeaders.put(JmsHeaders.REDELIVERED, String.valueOf(message.getJMSRedelivered())); + messageHeaders.put(JmsHeaders.TIMESTAMP, String.valueOf(message.getJMSTimestamp())); + messageHeaders.put(JmsHeaders.CORRELATION_ID, message.getJMSCorrelationID()); + messageHeaders.put(JmsHeaders.MESSAGE_ID, message.getJMSMessageID()); + messageHeaders.put(JmsHeaders.TYPE, message.getJMSType()); + + String replyToDestinationName = this.retrieveDestinationName(message.getJMSReplyTo(), JmsHeaders.REPLY_TO); + if (replyToDestinationName != null) { + messageHeaders.put(JmsHeaders.REPLY_TO, replyToDestinationName); } + + String destinationName = this.retrieveDestinationName(message.getJMSDestination(), JmsHeaders.DESTINATION); + if (destinationName != null) { + messageHeaders.put(JmsHeaders.DESTINATION, destinationName); + } + return messageHeaders; } - /** - * - */ + private String retrieveDestinationName(Destination destination, String headerName) { String destinationName = null; if (destination != null) { @@ -196,17 +181,14 @@ final class JMSConsumer extends JMSWorker { return destinationName; } - /** - * - */ + static class JMSResponse { private final byte[] messageBody; - private final Map messageHeaders; - + private final Map messageHeaders; private final Map messageProperties; - JMSResponse(byte[] messageBody, Map messageHeaders, Map messageProperties) { + JMSResponse(byte[] messageBody, Map messageHeaders, Map messageProperties) { this.messageBody = messageBody; this.messageHeaders = Collections.unmodifiableMap(messageHeaders); this.messageProperties = Collections.unmodifiableMap(messageProperties); @@ -216,7 +198,7 @@ final class JMSConsumer extends JMSWorker { return this.messageBody; } - public Map getMessageHeaders() { + public Map getMessageHeaders() { return this.messageHeaders; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 49e3354069..671f5c9640 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -27,10 +27,8 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; @@ -41,45 +39,22 @@ import org.springframework.jms.support.JmsHeaders; */ final class JMSPublisher extends JMSWorker { - private final static Logger logger = LoggerFactory.getLogger(JMSPublisher.class); - - - /** - * Creates an instance of this publisher - * - * @param jmsTemplate - * instance of {@link JmsTemplate} - * @param processLog - * instance of {@link ComponentLog} - */ - JMSPublisher(JmsTemplate jmsTemplate, ComponentLog processLog) { - super(jmsTemplate, processLog); - if (logger.isInfoEnabled()) { - logger.info("Created Message Publisher for '" + jmsTemplate.toString() + "'."); - } + JMSPublisher(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) { + super(connectionFactory, jmsTemplate, processLog); + processLog.debug("Created Message Publisher for {}", new Object[] {jmsTemplate}); } - /** - * - * @param messageBytes byte array representing contents of the message - */ void publish(String destinationName, byte[] messageBytes) { this.publish(destinationName, messageBytes, null); } - /** - * - * @param messageBytes - * byte array representing contents of the message - * @param flowFileAttributes - * Map representing {@link FlowFile} attributes. - */ void publish(final String destinationName, final byte[] messageBytes, final Map flowFileAttributes) { this.jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { BytesMessage message = session.createBytesMessage(); message.writeBytes(messageBytes); + if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) { // set message headers and properties for (Entry entry : flowFileAttributes.entrySet()) { @@ -121,18 +96,12 @@ final class JMSPublisher extends JMSWorker { }); } - /** - * - */ + private void logUnbuildableDestination(String destinationName, String headerName) { - this.processLog.warn("Failed to determine destination type from destination name '" + destinationName - + "'. The '" - + headerName + "' will not be set."); + this.processLog.warn("Failed to determine destination type from destination name '{}'. The '{}' header will not be set.", new Object[] {destinationName, headerName}); } - /** - * - */ + private Destination buildDestination(final String destinationName) { Destination destination; if (destinationName.toLowerCase().contains("topic")) { @@ -152,6 +121,7 @@ final class JMSPublisher extends JMSWorker { } else { destination = null; } + return destination; } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java index b0e7087881..e6fa1bb071 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java @@ -21,6 +21,7 @@ import java.nio.channels.Channel; import javax.jms.Connection; import org.apache.nifi.logging.ComponentLog; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; @@ -33,8 +34,9 @@ import org.springframework.jms.core.JmsTemplate; abstract class JMSWorker { protected final JmsTemplate jmsTemplate; - protected final ComponentLog processLog; + private final CachingConnectionFactory connectionFactory; + /** * Creates an instance of this worker initializing it with JMS @@ -44,14 +46,16 @@ abstract class JMSWorker { * @param jmsTemplate the instance of {@link JmsTemplate} * @param processLog the instance of {@link ComponentLog} */ - public JMSWorker(JmsTemplate jmsTemplate, ComponentLog processLog) { + public JMSWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ComponentLog processLog) { + this.connectionFactory = connectionFactory; this.jmsTemplate = jmsTemplate; this.processLog = processLog; } - /** - * - */ + public void shutdown() { + connectionFactory.destroy(); + } + @Override public String toString() { return this.getClass().getSimpleName() + "[destination:" + this.jmsTemplate.getDefaultDestinationName() diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 24c3f250a3..80d6f3e305 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.jms.processors; -import java.io.IOException; -import java.io.InputStream; import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -37,8 +35,8 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; +import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; @@ -91,20 +89,19 @@ public class PublishJMS extends AbstractJMSProcessor { * Upon success the incoming {@link FlowFile} is transferred to the'success' * {@link Relationship} and upon failure FlowFile is penalized and * transferred to the 'failure' {@link Relationship} - * */ @Override - protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { + protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession, JMSPublisher publisher) throws ProcessException { FlowFile flowFile = processSession.get(); if (flowFile != null) { try { String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); - this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); + publisher.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); processSession.transfer(flowFile, REL_SUCCESS); processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); } catch (Exception e) { processSession.transfer(flowFile, REL_FAILURE); - this.getLogger().error("Failed while sending message to JMS via " + this.targetResource, e); + this.getLogger().error("Failed while sending message to JMS via " + publisher, e); context.yield(); } } @@ -122,8 +119,8 @@ public class PublishJMS extends AbstractJMSProcessor { * Will create an instance of {@link JMSPublisher} */ @Override - protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) { - return new JMSPublisher(jmsTemplate, this.getLogger()); + protected JMSPublisher finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) { + return new JMSPublisher(connectionFactory, jmsTemplate, this.getLogger()); } /** @@ -131,12 +128,7 @@ public class PublishJMS extends AbstractJMSProcessor { */ private byte[] extractMessageBody(FlowFile flowFile, ProcessSession session) { final byte[] messageContent = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, messageContent, true); - } - }); + session.read(flowFile, in -> StreamUtils.fillBuffer(in, messageContent, true)); return messageContent; } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java index 12ffe032cd..5e963d2907 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java @@ -52,13 +52,13 @@ public class CommonTest { } static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( - "vm://localhost?broker.persistent=false"); - connectionFactory = new CachingConnectionFactory(connectionFactory); + ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory); JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setPubSubDomain(pubSub); jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + jmsTemplate.setReceiveTimeout(10L); return jmsTemplate; } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java index e9364d28f8..23cf806ced 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -16,6 +16,10 @@ */ package org.apache.nifi.jms.processors; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.HashMap; import java.util.Map; @@ -29,45 +33,43 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class ConsumeJMSTest { @Test public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { final String destinationName = "cooQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - final Map senderAttributes = new HashMap<>(); - senderAttributes.put("filename", "message.txt"); - senderAttributes.put("attribute_from_sender", "some value"); - sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes); - TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); - JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); - when(cs.getIdentifier()).thenReturn("cfProvider"); - when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); - runner.addControllerService("cfProvider", cs); - runner.enableControllerService(cs); + try { + JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + final Map senderAttributes = new HashMap<>(); + senderAttributes.put("filename", "message.txt"); + senderAttributes.put("attribute_from_sender", "some value"); + sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes); + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); - runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(ConsumeJMS.DESTINATION, destinationName); - runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); - runner.run(1, false); - // - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); - assertNotNull(successFF); - successFF.assertAttributeExists(JmsHeaders.DESTINATION); - successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName); - successFF.assertAttributeExists("filename"); - successFF.assertAttributeEquals("filename", "message.txt"); - successFF.assertAttributeExists("attribute_from_sender"); - successFF.assertAttributeEquals("attribute_from_sender", "some value"); - successFF.assertContentEquals("Hey dude!".getBytes()); - String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); - assertNotNull(sourceDestination); - - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, destinationName); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); + // + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + successFF.assertAttributeExists(JmsHeaders.DESTINATION); + successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName); + successFF.assertAttributeExists("filename"); + successFF.assertAttributeEquals("filename", "message.txt"); + successFF.assertAttributeExists("attribute_from_sender"); + successFF.assertAttributeEquals("attribute_from_sender", "some value"); + successFF.assertContentEquals("Hey dude!".getBytes()); + String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); + assertNotNull(sourceDestination); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java index 0f8dafbe33..1a88b29b54 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.JMSException; @@ -48,16 +50,18 @@ public class JMSPublisherConsumerTest { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - publisher.publish(destinationName, "hellomq".getBytes()); + try { + JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + publisher.publish(destinationName, "hellomq".getBytes()); - Message receivedMessage = jmsTemplate.receive(destinationName); - assertTrue(receivedMessage instanceof BytesMessage); - byte[] bytes = new byte[7]; - ((BytesMessage) receivedMessage).readBytes(bytes); - assertEquals("hellomq", new String(bytes)); - - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + Message receivedMessage = jmsTemplate.receive(destinationName); + assertTrue(receivedMessage instanceof BytesMessage); + byte[] bytes = new byte[7]; + ((BytesMessage) receivedMessage).readBytes(bytes); + assertEquals("hellomq", new String(bytes)); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } } @Test @@ -65,19 +69,22 @@ public class JMSPublisherConsumerTest { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - Map flowFileAttributes = new HashMap<>(); - flowFileAttributes.put("foo", "foo"); - flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); - publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes); + try { + JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + Map flowFileAttributes = new HashMap<>(); + flowFileAttributes.put("foo", "foo"); + flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); + publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes); - Message receivedMessage = jmsTemplate.receive(destinationName); - assertTrue(receivedMessage instanceof BytesMessage); - assertEquals("foo", receivedMessage.getStringProperty("foo")); - assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); - assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName()); + Message receivedMessage = jmsTemplate.receive(destinationName); + assertTrue(receivedMessage instanceof BytesMessage); + assertEquals("foo", receivedMessage.getStringProperty("foo")); + assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); + assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName()); - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } } /** @@ -91,15 +98,15 @@ public class JMSPublisherConsumerTest { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - jmsTemplate.send(destinationName, new MessageCreator() { - @Override - public Message createMessage(Session session) throws JMSException { - return session.createObjectMessage(); - } - }); - - JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); try { + jmsTemplate.send(destinationName, new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + return session.createObjectMessage(); + } + }); + + JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); consumer.consume(destinationName, false, false, null, new ConsumerCallback() { @Override public void accept(JMSResponse response) { @@ -116,102 +123,182 @@ public class JMSPublisherConsumerTest { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - jmsTemplate.send(destinationName, new MessageCreator() { - @Override - public Message createMessage(Session session) throws JMSException { - TextMessage message = session.createTextMessage("hello from the other side"); - message.setStringProperty("foo", "foo"); - message.setBooleanProperty("bar", false); - message.setJMSReplyTo(session.createQueue("fooQueue")); - return message; - } - }); + try { + jmsTemplate.send(destinationName, new MessageCreator() { + @Override + public Message createMessage(Session session) throws JMSException { + TextMessage message = session.createTextMessage("hello from the other side"); + message.setStringProperty("foo", "foo"); + message.setBooleanProperty("bar", false); + message.setJMSReplyTo(session.createQueue("fooQueue")); + return message; + } + }); - JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); - final AtomicBoolean callbackInvoked = new AtomicBoolean(); - consumer.consume(destinationName, false, false, null, new ConsumerCallback() { - @Override - public void accept(JMSResponse response) { - callbackInvoked.set(true); - assertEquals("hello from the other side", new String(response.getMessageBody())); - assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); - assertEquals("foo", response.getMessageProperties().get("foo")); - assertEquals("false", response.getMessageProperties().get("bar")); - } - }); - assertTrue(callbackInvoked.get()); + JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("hello from the other side", new String(response.getMessageBody())); + assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); + assertEquals("foo", response.getMessageProperties().get("foo")); + assertEquals("false", response.getMessageProperties().get("bar")); + } + }); + assertTrue(callbackInvoked.get()); - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } } - @Test + + @Test(timeout = 2000000) + public void testMultipleThreads() throws Exception { + String destinationName = "testQueue"; + JmsTemplate publishTemplate = CommonTest.buildJmsTemplateForDestination(false); + final CountDownLatch consumerTemplateCloseCount = new CountDownLatch(4); + + try { + JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) publishTemplate.getConnectionFactory(), publishTemplate, mock(ComponentLog.class)); + for (int i = 0; i < 4000; i++) { + publisher.publish(destinationName, String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + + final AtomicInteger msgCount = new AtomicInteger(0); + + final ConsumerCallback callback = new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + msgCount.incrementAndGet(); + } + }; + + final Thread[] threads = new Thread[4]; + for (int i = 0; i < 4; i++) { + final Thread t = new Thread(() -> { + JmsTemplate consumeTemplate = CommonTest.buildJmsTemplateForDestination(false); + + try { + JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class)); + + for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) { + consumer.consume(destinationName, false, false, null, callback); + } + } finally { + ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy(); + consumerTemplateCloseCount.countDown(); + } + }); + + threads[i] = t; + t.start(); + } + + int iterations = 0; + while (msgCount.get() < 4000) { + Thread.sleep(10L); + if (++iterations % 100 == 0) { + System.out.println(msgCount.get() + " messages received so far"); + } + } + } finally { + ((CachingConnectionFactory) publishTemplate.getConnectionFactory()).destroy(); + + consumerTemplateCloseCount.await(); + } + } + + + @Test(timeout = 10000) public void validateMessageRedeliveryWhenNotAcked() throws Exception { String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8)); - publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8)); - - JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); - final AtomicBoolean callbackInvoked = new AtomicBoolean(); try { - consumer.consume(destinationName, false, false, null, new ConsumerCallback() { - @Override - public void accept(JMSResponse response) { - callbackInvoked.set(true); - assertEquals("1", new String(response.getMessageBody())); - throw new RuntimeException("intentional to avoid explicit ack"); - } - }); - } catch (Exception e) { - // ignore - } - assertTrue(callbackInvoked.get()); - callbackInvoked.set(false); + JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8)); + publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8)); - // should receive the same message, but will process it successfully - try { - consumer.consume(destinationName, false, false, null, new ConsumerCallback() { - @Override - public void accept(JMSResponse response) { - callbackInvoked.set(true); - assertEquals("1", new String(response.getMessageBody())); - } - }); - } catch (Exception e) { - // ignore - } - assertTrue(callbackInvoked.get()); - callbackInvoked.set(false); + JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + try { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("1", new String(response.getMessageBody())); + throw new RuntimeException("intentional to avoid explicit ack"); + } + }); + } catch (Exception e) { + // expected + } - // receiving next message and fail again - try { - consumer.consume(destinationName, false, false, null, new ConsumerCallback() { - @Override - public void accept(JMSResponse response) { - callbackInvoked.set(true); - assertEquals("2", new String(response.getMessageBody())); - throw new RuntimeException("intentional to avoid explicit ack"); - } - }); - } catch (Exception e) { - // ignore - } - assertTrue(callbackInvoked.get()); - callbackInvoked.set(false); + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); - // should receive the same message, but will process it successfully - try { - consumer.consume(destinationName, false, false, null, new ConsumerCallback() { - @Override - public void accept(JMSResponse response) { - callbackInvoked.set(true); - assertEquals("2", new String(response.getMessageBody())); + // should receive the same message, but will process it successfully + while (!callbackInvoked.get()) { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + if (response == null) { + return; + } + + callbackInvoked.set(true); + assertEquals("1", new String(response.getMessageBody())); + } + }); + } + + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); + + // receiving next message and fail again + try { + while (!callbackInvoked.get()) { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + if (response == null) { + return; + } + + callbackInvoked.set(true); + assertEquals("2", new String(response.getMessageBody())); + throw new RuntimeException("intentional to avoid explicit ack"); + } + }); } - }); - } catch (Exception e) { - // ignore + } catch (Exception e) { + // ignore + } + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); + + // should receive the same message, but will process it successfully + try { + while (!callbackInvoked.get()) { + consumer.consume(destinationName, false, false, null, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + if (response == null) { + return; + } + + callbackInvoked.set(true); + assertEquals("2", new String(response.getMessageBody())); + } + }); + } + } catch (Exception e) { + // ignore + } + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } - ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java index f7ccf17ee7..1964ce9f64 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java @@ -39,7 +39,7 @@ import static org.mockito.Mockito.when; public class PublishJMSTest { - @Test + @Test(timeout = 10000) public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); @@ -60,7 +60,7 @@ public class PublishJMSTest { attributes.put("foo", "foo"); attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); runner.enqueue("Hey dude!".getBytes(), attributes); - runner.run(1, false); + runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); assertNotNull(successFF); @@ -72,6 +72,8 @@ public class PublishJMSTest { assertEquals("Hey dude!", new String(messageBytes)); assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); assertEquals("foo", message.getStringProperty("foo")); + + runner.run(1, true); // Run once just so that we can trigger the shutdown of the Connection Factory } @Test @@ -96,7 +98,7 @@ public class PublishJMSTest { attributes.put("foo", "foo"); attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); runner.enqueue("Hey dude!".getBytes(), attributes); - runner.run(1, false); + runner.run(1, false); // Run once but don't shut down because we want the Connection Factory left in tact so that we can use it. final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); assertNotNull(successFF); @@ -108,6 +110,8 @@ public class PublishJMSTest { assertEquals("Hey dude!", new String(messageBytes)); assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); assertEquals("foo", message.getStringProperty("foo")); + + runner.run(1, true); // Run once just so that we can trigger the shutdown of the Connection Factory } @Test