From d36b76cc600b76748ffba9b94d6b702af552756f Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 7 Sep 2016 11:04:54 -0400 Subject: [PATCH] NIFI-2745 added _source destination name_ attribute to ConsumeJMS This closes #992 --- .../org/apache/nifi/jms/processors/AbstractJMSProcessor.java | 5 ++++- .../main/java/org/apache/nifi/jms/processors/ConsumeJMS.java | 3 +++ .../java/org/apache/nifi/jms/processors/ConsumeJMSTest.java | 2 ++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 398c5c1042..20937b5e97 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -114,6 +114,8 @@ abstract class AbstractJMSProcessor extends AbstractProcess private volatile CachingConnectionFactory cachingConnectionFactory; + protected volatile String destinationName; + /** * */ @@ -199,7 +201,8 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); - jmsTemplate.setDefaultDestinationName(context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); + this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); + jmsTemplate.setDefaultDestinationName(this.destinationName); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index ac05f2cb8f..83e594a3c3 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -54,6 +54,8 @@ import org.springframework.jms.core.JmsTemplate; @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) public class ConsumeJMS extends AbstractJMSProcessor { + public static final String JMS_SOURCE_DESTINATION_NAME = "jms.source.destination"; + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("All FlowFiles that are received from the JMS Destination are routed to this relationship") @@ -118,6 +120,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { for (Entry headersEntry : jmsHeaders.entrySet()) { attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue())); } + attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); flowFile = processSession.putAllAttributes(flowFile, attributes); return flowFile; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java index 9366e8d2e9..57d7dda3e7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -54,6 +54,8 @@ public class ConsumeJMSTest { assertNotNull(successFF); assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION)); successFF.assertContentEquals("Hey dude!".getBytes()); + String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); + assertNotNull(sourceDestination); ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); }