NIFI-6721: This closes #7789. jms_expiration attribute problem fix

Originally authored in part by sjyang18 <ilsong02@hotmail.com>

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
annanys23 2023-09-08 16:25:55 +00:00 committed by Joseph Witt
parent 342fb0d876
commit 9b9dd4bae3
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
3 changed files with 58 additions and 3 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.jms.processors; package org.apache.nifi.jms.processors;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
@ -28,6 +29,7 @@ import javax.jms.JMSException;
import javax.jms.Message; import javax.jms.Message;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.time.Instant;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -79,8 +81,25 @@ class JMSPublisher extends JMSWorker {
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue())); this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
this.jmsTemplate.setExplicitQosEnabled(true); this.jmsTemplate.setExplicitQosEnabled(true);
} else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) { } else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue())); if(NumberUtils.isCreatable(entry.getValue())) { //ignore any non-numeric values
long expiration = Long.parseLong(entry.getValue());
long ttl = 0L;
// if expiration was set to a positive non-zero value, then calculate the ttl
// jmsTemplate does not have an expiration field, and can only accept a ttl value
// which is then used to set jms_expiration header
// ttl is in epoch millis
if(expiration > 0) {
ttl = expiration - Instant.now().toEpochMilli();
if(ttl > 0) {
this.jmsTemplate.setTimeToLive(ttl);
this.jmsTemplate.setExplicitQosEnabled(true); this.jmsTemplate.setExplicitQosEnabled(true);
} // else, use default ttl
} else if (expiration == 0) { // expiration == 0 means no expiration in jms
this.jmsTemplate.setTimeToLive(0); //ttl must be explicitly set to 0 to indicate no expiration
this.jmsTemplate.setExplicitQosEnabled(true);
} // else, use default ttl
}
} else if (entry.getKey().equals(JmsHeaders.PRIORITY)) { } else if (entry.getKey().equals(JmsHeaders.PRIORITY)) {
this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue())); this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue()));
this.jmsTemplate.setExplicitQosEnabled(true); this.jmsTemplate.setExplicitQosEnabled(true);

View File

@ -85,7 +85,7 @@ import static org.apache.nifi.jms.processors.ioconcept.reader.record.ProvenanceE
+ "FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.") + "FlowFile attributes will be added as JMS headers and/or properties to the outgoing JMS message.")
@ReadsAttributes({ @ReadsAttributes({
@ReadsAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "This attribute becomes the JMSDeliveryMode message header. Must be an integer."), @ReadsAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "This attribute becomes the JMSDeliveryMode message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This attribute becomes the JMSExpiration message header. Must be an integer."), @ReadsAttribute(attribute = JmsHeaders.EXPIRATION, description = "This attribute becomes the JMSExpiration message header. Must be a long."),
@ReadsAttribute(attribute = JmsHeaders.PRIORITY, description = "This attribute becomes the JMSPriority message header. Must be an integer."), @ReadsAttribute(attribute = JmsHeaders.PRIORITY, description = "This attribute becomes the JMSPriority message header. Must be an integer."),
@ReadsAttribute(attribute = JmsHeaders.REDELIVERED, description = "This attribute becomes the JMSRedelivered message header."), @ReadsAttribute(attribute = JmsHeaders.REDELIVERED, description = "This attribute becomes the JMSRedelivered message header."),
@ReadsAttribute(attribute = JmsHeaders.TIMESTAMP, description = "This attribute becomes the JMSTimestamp message header. Must be a long."), @ReadsAttribute(attribute = JmsHeaders.TIMESTAMP, description = "This attribute becomes the JMSTimestamp message header. Must be a long."),

View File

@ -41,6 +41,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -261,6 +262,41 @@ public class JMSPublisherConsumerIT {
} }
} }
@Test
public void validateNIFI6721() throws Exception {
final String destinationName = "validateNIFI6721";
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
ComponentLog mockLog = mock(ComponentLog.class);
JMSPublisher publisher = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mockLog);
Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be long, make sure non-long doesn't cause problems
publisher.publish(destinationName, "hellomq-0".getBytes(), flowFileAttributes);
Message receivedMessage = jmsTemplate.receive(destinationName);
assertEquals(0, receivedMessage.getJMSExpiration());
long expiration = Instant.now().toEpochMilli() + 1000 * 120;
flowFileAttributes.put(JmsHeaders.EXPIRATION, Long.toString(expiration));
publisher.publish(destinationName, "hellomq-1".getBytes(), flowFileAttributes);
receivedMessage = jmsTemplate.receive(destinationName);
assertEquals(expiration, receivedMessage.getJMSExpiration());
flowFileAttributes.put(JmsHeaders.EXPIRATION, "-1");
publisher.publish(destinationName, "hellomq-3".getBytes(), flowFileAttributes);
receivedMessage = jmsTemplate.receive(destinationName);
assertTrue(receivedMessage.getJMSExpiration() > 0);
flowFileAttributes.put(JmsHeaders.EXPIRATION, "0");
publisher.publish(destinationName, "hellomq-2".getBytes(), flowFileAttributes);
//assertEquals(mockLog.getWarnMessages().size(), 0);
receivedMessage = jmsTemplate.receive(destinationName);
assertEquals(0, receivedMessage.getJMSExpiration());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
/** /**
* At the moment the only two supported message types are TextMessage and * At the moment the only two supported message types are TextMessage and
* BytesMessage which is sufficient for the type if JMS use cases NiFi is * BytesMessage which is sufficient for the type if JMS use cases NiFi is