diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index 67a49a9dd6..d5892f013b 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.jms.processors; +import org.apache.commons.lang3.math.NumberUtils; import org.apache.nifi.logging.ComponentLog; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; @@ -28,6 +29,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; +import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -79,8 +81,25 @@ class JMSPublisher extends JMSWorker { this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue())); this.jmsTemplate.setExplicitQosEnabled(true); } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) { - this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue())); - this.jmsTemplate.setExplicitQosEnabled(true); + if(NumberUtils.isCreatable(entry.getValue())) { //ignore any non-numeric values + long expiration = Long.parseLong(entry.getValue()); + long ttl = 0L; + + // if expiration was set to a positive non-zero value, then calculate the ttl + // jmsTemplate does not have an expiration field, and can only accept a ttl value + // which is then used to set jms_expiration header + // ttl is in epoch millis + if(expiration > 0) { + ttl = expiration - Instant.now().toEpochMilli(); + if(ttl > 0) { + this.jmsTemplate.setTimeToLive(ttl); + this.jmsTemplate.setExplicitQosEnabled(true); + } // else, use default ttl + } else if (expiration == 0) { // expiration == 0 means no expiration in jms + this.jmsTemplate.setTimeToLive(0); //ttl must be explicitly set to 0 to indicate no expiration + this.jmsTemplate.setExplicitQosEnabled(true); + } // else, use default ttl + } } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) { this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue())); this.jmsTemplate.setExplicitQosEnabled(true); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 8fea7dd663..1e37f6aea1 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -85,7 +85,7 @@ import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceE + "FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.") @ReadsAttributes({ @ReadsAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "This attribute becomes the JMSDeliveryMode message header. Must be an integer."), - @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This attribute becomes the JMSExpiration message header. Must be an integer."), + @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This attribute becomes the JMSExpiration message header. Must be a long."), @ReadsAttribute(attribute = JmsHeaders.PRIORITY, description = "This attribute becomes the JMSPriority message header. Must be an integer."), @ReadsAttribute(attribute = JmsHeaders.REDELIVERED, description = "This attribute becomes the JMSRedelivered message header."), @ReadsAttribute(attribute = JmsHeaders.TIMESTAMP, description = "This attribute becomes the JMSTimestamp message header. Must be a long."), diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java index e78734ac15..36dd3828e7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java @@ -41,6 +41,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -261,6 +262,41 @@ public class JMSPublisherConsumerIT { } } + @Test + public void validateNIFI6721() throws Exception { + + final String destinationName = "validateNIFI6721"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + + try { + ComponentLog mockLog = mock(ComponentLog.class); + JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mockLog); + Map flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be long, make sure non-long doesn't cause problems + publisher.publish(destinationName, "hellomq-0".getBytes(), flowFileAttributes); + Message receivedMessage = jmsTemplate.receive(destinationName); + assertEquals(0, receivedMessage.getJMSExpiration()); + + long expiration = Instant.now().toEpochMilli() + 1000 * 120; + flowFileAttributes.put(JmsHeaders.EXPIRATION, Long.toString(expiration)); + publisher.publish(destinationName, "hellomq-1".getBytes(), flowFileAttributes); + receivedMessage = jmsTemplate.receive(destinationName); + assertEquals(expiration, receivedMessage.getJMSExpiration()); + + flowFileAttributes.put(JmsHeaders.EXPIRATION, "-1"); + publisher.publish(destinationName, "hellomq-3".getBytes(), flowFileAttributes); + receivedMessage = jmsTemplate.receive(destinationName); + assertTrue(receivedMessage.getJMSExpiration() > 0); + + flowFileAttributes.put(JmsHeaders.EXPIRATION, "0"); + publisher.publish(destinationName, "hellomq-2".getBytes(), flowFileAttributes); + //assertEquals(mockLog.getWarnMessages().size(), 0); + receivedMessage = jmsTemplate.receive(destinationName); + assertEquals(0, receivedMessage.getJMSExpiration()); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } /** * At the moment the only two supported message types are TextMessage and * BytesMessage which is sufficient for the type if JMS use cases NiFi is