From a0c28ee19e1b29b30e17946f074a01f94cfbc3c0 Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Fri, 5 Oct 2018 14:08:28 +0000 Subject: [PATCH] NIFI-5660: JMSPublisher should set some header properties in JmsTemplate instead of directly in the message NIFI-5660: Added lines to integration test to verify these header properties (added by Mike Moser) Signed-off-by: Mike Moser This closes #3053 --- .../apache/nifi/jms/processors/AbstractJMSProcessor.java | 5 +++++ .../org/apache/nifi/jms/processors/JMSPublisher.java | 9 ++++++--- .../nifi/jms/processors/JMSPublisherConsumerIT.java | 4 ++++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index f47cf784da..e51238dd79 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -33,6 +33,7 @@ import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapte import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; +import javax.jms.Message; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -177,6 +178,10 @@ abstract class AbstractJMSProcessor extends AbstractProcess } } if (worker != null) { + worker.jmsTemplate.setExplicitQosEnabled(false); + worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE); + worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); + worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY); workerPool.offer(worker); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index c13f4b71b0..506de49db4 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -84,11 +84,14 @@ final class JMSPublisher extends JMSWorker { for (Entry entry : flowFileAttributesToSend.entrySet()) { try { if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) { - message.setJMSDeliveryMode(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) { - message.setJMSExpiration(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) { - message.setJMSPriority(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue())); + this.jmsTemplate.setExplicitQosEnabled(true); } else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) { message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue())); } else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java index 7812e7185a..12474ecd80 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java @@ -77,6 +77,8 @@ public class JMSPublisherConsumerIT { flowFileAttributes.put("illegal-property", "value"); flowFileAttributes.put("another.illegal", "value"); flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); + flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1"); + flowFileAttributes.put(JmsHeaders.PRIORITY, "1"); flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be integer, make sure non-integer doesn't cause problems publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes); @@ -86,6 +88,8 @@ public class JMSPublisherConsumerIT { assertFalse(receivedMessage.propertyExists("illegal-property")); assertFalse(receivedMessage.propertyExists("another.illegal")); assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); + assertEquals(1, receivedMessage.getJMSDeliveryMode()); + assertEquals(1, receivedMessage.getJMSPriority()); assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName()); } finally {