mirror of https://github.com/apache/nifi.git
NIFI-5660: JMSPublisher should set some header properties in JmsTemplate instead of directly in the message
NIFI-5660: Added lines to integration test to verify these header properties (added by Mike Moser) Signed-off-by: Mike Moser <mosermw@apache.org> This closes #3053
This commit is contained in:
parent
270b6d5897
commit
a0c28ee19e
|
@ -33,6 +33,7 @@ import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapte
|
|||
import org.springframework.jms.core.JmsTemplate;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Message;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -177,6 +178,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
|
|||
}
|
||||
}
|
||||
if (worker != null) {
|
||||
worker.jmsTemplate.setExplicitQosEnabled(false);
|
||||
worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
|
||||
worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
|
||||
worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
|
||||
workerPool.offer(worker);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,11 +84,14 @@ final class JMSPublisher extends JMSWorker {
|
|||
for (Entry<String, String> entry : flowFileAttributesToSend.entrySet()) {
|
||||
try {
|
||||
if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) {
|
||||
message.setJMSDeliveryMode(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setDeliveryMode(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setExplicitQosEnabled(true);
|
||||
} else if (entry.getKey().equals(JmsHeaders.EXPIRATION)) {
|
||||
message.setJMSExpiration(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setTimeToLive(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setExplicitQosEnabled(true);
|
||||
} else if (entry.getKey().equals(JmsHeaders.PRIORITY)) {
|
||||
message.setJMSPriority(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setPriority(Integer.parseInt(entry.getValue()));
|
||||
this.jmsTemplate.setExplicitQosEnabled(true);
|
||||
} else if (entry.getKey().equals(JmsHeaders.REDELIVERED)) {
|
||||
message.setJMSRedelivered(Boolean.parseBoolean(entry.getValue()));
|
||||
} else if (entry.getKey().equals(JmsHeaders.TIMESTAMP)) {
|
||||
|
|
|
@ -77,6 +77,8 @@ public class JMSPublisherConsumerIT {
|
|||
flowFileAttributes.put("illegal-property", "value");
|
||||
flowFileAttributes.put("another.illegal", "value");
|
||||
flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic");
|
||||
flowFileAttributes.put(JmsHeaders.DELIVERY_MODE, "1");
|
||||
flowFileAttributes.put(JmsHeaders.PRIORITY, "1");
|
||||
flowFileAttributes.put(JmsHeaders.EXPIRATION, "never"); // value expected to be integer, make sure non-integer doesn't cause problems
|
||||
publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes);
|
||||
|
||||
|
@ -86,6 +88,8 @@ public class JMSPublisherConsumerIT {
|
|||
assertFalse(receivedMessage.propertyExists("illegal-property"));
|
||||
assertFalse(receivedMessage.propertyExists("another.illegal"));
|
||||
assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic);
|
||||
assertEquals(1, receivedMessage.getJMSDeliveryMode());
|
||||
assertEquals(1, receivedMessage.getJMSPriority());
|
||||
assertEquals("myTopic", ((Topic) receivedMessage.getJMSReplyTo()).getTopicName());
|
||||
|
||||
} finally {
|
||||
|
|
Loading…
Reference in New Issue