From ee14ae8af0740b4ca442b2662c48f422c53b144e Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 20 Sep 2016 10:49:22 -0400 Subject: [PATCH] NIFI-2774 added configurable QoS options to ConsumeJMS Signed-off-by: Mike Moser This closes #1036. --- .../jms/processors/AbstractJMSProcessor.java | 15 ++- .../nifi/jms/processors/ConsumeJMS.java | 95 +++++++++++++---- .../nifi/jms/processors/JMSConsumer.java | 84 ++++++++++----- .../nifi/jms/processors/JMSPublisher.java | 2 +- .../nifi/jms/processors/PublishJMS.java | 2 +- .../nifi/jms/processors/CommonTest.java | 10 +- .../processors/JMSPublisherConsumerTest.java | 100 ++++++++++++++++-- 7 files changed, 238 insertions(+), 70 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index cae3dc2631..d5b704b6c6 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -102,20 +102,18 @@ abstract class AbstractJMSProcessor extends AbstractProcess * all other lifecycle methods are invoked multiple times. */ static { - propertyDescriptors.add(USER); - propertyDescriptors.add(PASSWORD); + propertyDescriptors.add(CF_SERVICE); propertyDescriptors.add(DESTINATION); propertyDescriptors.add(DESTINATION_TYPE); + propertyDescriptors.add(USER); + propertyDescriptors.add(PASSWORD); propertyDescriptors.add(SESSION_CACHE_SIZE); - propertyDescriptors.add(CF_SERVICE); } protected volatile T targetResource; private volatile CachingConnectionFactory cachingConnectionFactory; - protected volatile String destinationName; - /** * */ @@ -176,7 +174,7 @@ abstract class AbstractJMSProcessor extends AbstractProcess * @see JMSPublisher * @see JMSConsumer */ - protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate); + protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext); /** * This method essentially performs initialization of this Processor by @@ -201,13 +199,12 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration - jmsTemplate.setReceiveTimeout(10000); + jmsTemplate.setReceiveTimeout(1000); - this.targetResource = this.finishBuildingTargetResource(jmsTemplate); + this.targetResource = this.finishBuildingTargetResource(jmsTemplate, context); } } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index b76a1a777d..d85d26f726 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -18,20 +18,27 @@ package org.apache.nifi.jms.processors; import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import javax.jms.Session; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; +import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -54,8 +61,31 @@ import org.springframework.jms.core.JmsTemplate; @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) public class ConsumeJMS extends AbstractJMSProcessor { + static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE), + "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")", + "Automatically acknowledges a client's receipt of a message, regardless if NiFi session has been commited. " + + "Can result in data loss in the event where NiFi abruptly stopped before session was commited."); + + static final AllowableValue CLIENT_ACK = new AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE), + "CLIENT_ACKNOWLEDGE (" + String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")", + "(DEFAULT) Manually acknowledges a client's receipt of a message after NiFi Session was commited, thus ensuring no data loss"); + + static final AllowableValue DUPS_OK = new AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE), + "DUPS_OK_ACKNOWLEDGE (" + String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")", + "This acknowledgment mode instructs the session to lazily acknowledge the delivery of messages. May result in both data " + + "duplication and data loss while achieving the best throughput."); + public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination"; + static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new PropertyDescriptor.Builder() + .name("Acknowledgement Mode") + .description("The JMS Acknowledgement Mode. Using Auto Acknowledge can cause messages to be lost on restart of NiFi but may provide " + + "better performance than Client Acknowledge.") + .required(true) + .allowableValues(AUTO_ACK, CLIENT_ACK, DUPS_OK) + .defaultValue(CLIENT_ACK.getValue()) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") @@ -63,7 +93,14 @@ public class ConsumeJMS extends AbstractJMSProcessor { private final static Set relationships; + private final static List thisPropertyDescriptors; + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(propertyDescriptors); + _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE); + thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + Set _relationships = new HashSet<>(); _relationships.add(REL_SUCCESS); relationships = Collections.unmodifiableSet(_relationships); @@ -77,33 +114,41 @@ public class ConsumeJMS extends AbstractJMSProcessor { * 'success' {@link Relationship}. */ @Override - protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { + protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession) throws ProcessException { final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); - final JMSResponse response = this.targetResource.consume(destinationName); - if (response != null){ - FlowFile flowFile = processSession.create(); - flowFile = processSession.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(response.getMessageBody()); + this.targetResource.consume(destinationName, new ConsumerCallback(){ + @Override + public void accept(final JMSResponse response) { + if (response != null){ + FlowFile flowFile = processSession.create(); + flowFile = processSession.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(response.getMessageBody()); + } + }); + Map jmsHeaders = response.getMessageHeaders(); + Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); + flowFile = ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); + flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); + processSession.getProvenanceReporter().receive(flowFile, destinationName); + processSession.transfer(flowFile, REL_SUCCESS); + processSession.commit(); + } else { + context.yield(); } - }); - Map jmsHeaders = response.getMessageHeaders(); - Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); - flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, processSession); - flowFile = this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, processSession); - processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); - processSession.transfer(flowFile, REL_SUCCESS); - } else { - context.yield(); - } + } + }); } /** * Will create an instance of {@link JMSConsumer} */ @Override - protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate) { + protected JMSConsumer finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) { + int ackMode = processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger(); + jmsTemplate.setSessionAcknowledgeMode(ackMode); return new JMSConsumer(jmsTemplate, this.getLogger()); } @@ -115,19 +160,25 @@ public class ConsumeJMS extends AbstractJMSProcessor { return relationships; } + /** + * + */ + @Override + protected List getSupportedPropertyDescriptors() { + return thisPropertyDescriptors; + } + /** * Copies JMS attributes (i.e., headers and properties) as FF attributes. * Given that FF attributes mandate that values are of type String, the * copied values of JMS attributes will be "stringified" via * String.valueOf(attribute). */ - private FlowFile updateFlowFileAttributesWithJMSAttributes(Map jmsAttributes, FlowFile flowFile, - ProcessSession processSession) { + private FlowFile updateFlowFileAttributesWithJMSAttributes(Map jmsAttributes, FlowFile flowFile, ProcessSession processSession) { Map attributes = new HashMap(); for (Entry entry : jmsAttributes.entrySet()) { attributes.put(entry.getKey(), String.valueOf(entry.getValue())); } - attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); flowFile = processSession.putAllAttributes(flowFile, attributes); return flowFile; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index 0ddbe486b8..e955236d31 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -25,23 +25,23 @@ import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.Queue; +import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.nifi.logging.ComponentLog; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.SessionCallback; import org.springframework.jms.support.JmsHeaders; +import org.springframework.jms.support.JmsUtils; /** * Generic consumer of messages from JMS compliant messaging system. */ final class JMSConsumer extends JMSWorker { - private final static Logger logger = LoggerFactory.getLogger(JMSConsumer.class); - /** * Creates an instance of this consumer * @@ -52,8 +52,8 @@ final class JMSConsumer extends JMSWorker { */ JMSConsumer(JmsTemplate jmsTemplate, ComponentLog processLog) { super(jmsTemplate, processLog); - if (logger.isInfoEnabled()) { - logger.info("Created Message Consumer for '" + jmsTemplate.toString() + "'."); + if (this.processLog.isInfoEnabled()) { + this.processLog.info("Created Message Consumer for '" + jmsTemplate.toString() + "'."); } } @@ -61,27 +61,51 @@ final class JMSConsumer extends JMSWorker { /** * */ - public JMSResponse consume(String destinationName) { - Message message = this.jmsTemplate.receive(destinationName); - if (message != null) { - byte[] messageBody = null; - try { - if (message instanceof TextMessage) { - messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message); - } else if (message instanceof BytesMessage) { - messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); - } else { - throw new UnsupportedOperationException("Message type other then TextMessage and BytesMessage are " - + "not supported at the moment"); + public void consume(final String destinationName, final ConsumerCallback consumerCallback) { + this.jmsTemplate.execute(new SessionCallback() { + @Override + public Void doInJms(Session session) throws JMSException { + /* + * We need to call recover to ensure that in in the event of + * abrupt end or exception the current session will stop message + * delivery and restarts with the oldest unacknowledged message + */ + session.recover(); + Destination destination = JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName( + session, destinationName, JMSConsumer.this.jmsTemplate.isPubSubDomain()); + MessageConsumer msgConsumer = session.createConsumer(destination, null, + JMSConsumer.this.jmsTemplate.isPubSubDomain()); + Message message = msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout()); + JMSResponse response = null; + try { + if (message != null) { + byte[] messageBody = null; + if (message instanceof TextMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message); + } else if (message instanceof BytesMessage) { + messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); + } else { + throw new IllegalStateException("Message type other then TextMessage and BytesMessage are " + + "not supported at the moment"); + } + Map messageHeaders = extractMessageHeaders(message); + Map messageProperties = extractMessageProperties(message); + response = new JMSResponse(messageBody, messageHeaders, messageProperties); + } + // invoke the processor callback (regardless if it's null, + // so the processor can yield) as part of this inJMS call + // and ACK message *only* after its successful invocation + // and if CLIENT_ACKNOWLEDGE is set. + consumerCallback.accept(response); + if (message != null && session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { + message.acknowledge(); + } + } finally { + JmsUtils.closeMessageConsumer(msgConsumer); } - Map messageHeaders = this.extractMessageHeaders(message); - Map messageProperties = this.extractMessageProperties(message); - return new JMSResponse(messageBody, messageHeaders, messageProperties); - } catch (Exception e) { - throw new IllegalStateException(e); + return null; } - } - return null; + }, true); } /** @@ -97,7 +121,6 @@ final class JMSConsumer extends JMSWorker { properties.put(propertyName, String.valueOf(message.getObjectProperty(propertyName))); } } catch (JMSException e) { - logger.warn("Failed to extract message properties", e); this.processLog.warn("Failed to extract message properties", e); } return properties; @@ -145,7 +168,6 @@ final class JMSConsumer extends JMSWorker { destinationName = (destination instanceof Queue) ? ((Queue) destination).getQueueName() : ((Topic) destination).getTopicName(); } catch (JMSException e) { - logger.warn("Failed to retrieve Destination name for '" + headerName + "' header", e); this.processLog.warn("Failed to retrieve Destination name for '" + headerName + "' header", e); } } @@ -180,4 +202,12 @@ final class JMSConsumer extends JMSWorker { return messageProperties; } } + + /** + * Callback to be invoked while executing inJMS call (the call within the + * live JMS session) + */ + static interface ConsumerCallback { + void accept(JMSResponse response); + } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 526f0bd1bb..49e3354069 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -74,7 +74,7 @@ final class JMSPublisher extends JMSWorker { * @param flowFileAttributes * Map representing {@link FlowFile} attributes. */ - void publish(final String destinationName, final byte[] messageBytes, Map flowFileAttributes) { + void publish(final String destinationName, final byte[] messageBytes, final Map flowFileAttributes) { this.jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 1f7a44e4b3..24c3f250a3 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -122,7 +122,7 @@ public class PublishJMS extends AbstractJMSProcessor { * Will create an instance of {@link JMSPublisher} */ @Override - protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate) { + protected JMSPublisher finishBuildingTargetResource(JmsTemplate jmsTemplate, ProcessContext processContext) { return new JMSPublisher(jmsTemplate, this.getLogger()); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java index 8a69a1487f..12ffe032cd 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java @@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue; import java.util.Iterator; import java.util.ServiceLoader; +import javax.jms.ConnectionFactory; +import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.nifi.processor.Processor; import org.junit.Test; @@ -49,12 +52,13 @@ public class CommonTest { } static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost?broker.persistent=false"); - CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory); + connectionFactory = new CachingConnectionFactory(connectionFactory); - JmsTemplate jmsTemplate = new JmsTemplate(cf); + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setPubSubDomain(pubSub); + jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); return jmsTemplate; } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java index be097fe41d..3168443240 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.JMSException; @@ -30,6 +32,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; import org.apache.nifi.logging.ComponentLog; import org.junit.Test; @@ -41,7 +44,7 @@ import org.springframework.jms.support.JmsHeaders; public class JMSPublisherConsumerTest { @Test - public void validateByesConvertedToBytesMessageOnSend() throws Exception { + public void validateBytesConvertedToBytesMessageOnSend() throws Exception { final String destinationName = "testQueue"; JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); @@ -97,7 +100,12 @@ public class JMSPublisherConsumerTest { JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); try { - consumer.consume(destinationName); + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + // noop + } + }); } finally { ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } @@ -120,12 +128,90 @@ public class JMSPublisherConsumerTest { }); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); - JMSResponse response = consumer.consume(destinationName); - assertEquals("hello from the other side", new String(response.getMessageBody())); - assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); - assertEquals("foo", response.getMessageProperties().get("foo")); - assertEquals("false", response.getMessageProperties().get("bar")); + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("hello from the other side", new String(response.getMessageBody())); + assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); + assertEquals("foo", response.getMessageProperties().get("foo")); + assertEquals("false", response.getMessageProperties().get("bar")); + } + }); + assertTrue(callbackInvoked.get()); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } + + @Test + public void validateMessageRedeliveryWhenNotAcked() throws Exception { + String destinationName = "testQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); + publisher.publish(destinationName, "1".getBytes(StandardCharsets.UTF_8)); + publisher.publish(destinationName, "2".getBytes(StandardCharsets.UTF_8)); + + JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); + final AtomicBoolean callbackInvoked = new AtomicBoolean(); + try { + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("1", new String(response.getMessageBody())); + throw new RuntimeException("intentional to avoid explicit ack"); + } + }); + } catch (Exception e) { + // ignore + } + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); + + // should receive the same message, but will process it successfully + try { + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("1", new String(response.getMessageBody())); + } + }); + } catch (Exception e) { + // ignore + } + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); + + // receiving next message and fail again + try { + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("2", new String(response.getMessageBody())); + throw new RuntimeException("intentional to avoid explicit ack"); + } + }); + } catch (Exception e) { + // ignore + } + assertTrue(callbackInvoked.get()); + callbackInvoked.set(false); + + // should receive the same message, but will process it successfully + try { + consumer.consume(destinationName, new ConsumerCallback() { + @Override + public void accept(JMSResponse response) { + callbackInvoked.set(true); + assertEquals("2", new String(response.getMessageBody())); + } + }); + } catch (Exception e) { + // ignore + } + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } }