NIFI-2745 added _source destination name_ attribute to ConsumeJMS

This closes #992
This commit is contained in:
Oleg Zhurakousky 2016-09-07 11:04:54 -04:00
parent 938e32ed97
commit d36b76cc60
3 changed files with 9 additions and 1 deletions

View File

@ -114,6 +114,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
private volatile CachingConnectionFactory cachingConnectionFactory; private volatile CachingConnectionFactory cachingConnectionFactory;
protected volatile String destinationName;
/** /**
* *
*/ */
@ -199,7 +201,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
JmsTemplate jmsTemplate = new JmsTemplate(); JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
jmsTemplate.setDefaultDestinationName(this.destinationName);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
// set of properties that may be good candidates for exposure via configuration // set of properties that may be good candidates for exposure via configuration

View File

@ -54,6 +54,8 @@ import org.springframework.jms.core.JmsTemplate;
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> { public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination";
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
.description("All FlowFiles that are received from the JMS Destination are routed to this relationship") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship")
@ -118,6 +120,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) { for (Entry<String, Object> headersEntry : jmsHeaders.entrySet()) {
attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue())); attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue()));
} }
attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName);
flowFile = processSession.putAllAttributes(flowFile, attributes); flowFile = processSession.putAllAttributes(flowFile, attributes);
return flowFile; return flowFile;
} }

View File

@ -54,6 +54,8 @@ public class ConsumeJMSTest {
assertNotNull(successFF); assertNotNull(successFF);
assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION)); assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION));
successFF.assertContentEquals("Hey dude!".getBytes()); successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
} }