From c23867605857f8380140ba4d69437b58585e9cba Mon Sep 17 00:00:00 2001 From: Joey Frazee Date: Mon, 19 Sep 2016 08:18:30 -0500 Subject: [PATCH] NIFI-2789, NIFI-2790 - Read JMS properties and add to FlowFile attributes in ConsumeJMS Remove unused assertEquals import Move destination from default to send/receive to support EL better --- .../jms/processors/AbstractJMSProcessor.java | 1 - .../nifi/jms/processors/ConsumeJMS.java | 13 +++--- .../nifi/jms/processors/JMSConsumer.java | 4 +- .../nifi/jms/processors/JMSPublisher.java | 10 ++--- .../nifi/jms/processors/PublishJMS.java | 3 +- .../nifi/jms/processors/CommonTest.java | 3 +- .../nifi/jms/processors/ConsumeJMSTest.java | 21 +++++++--- .../processors/JMSPublisherConsumerTest.java | 30 ++++++------- .../nifi/jms/processors/PublishJMSTest.java | 42 +++++++++++++++++-- 9 files changed, 89 insertions(+), 38 deletions(-) diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index 20937b5e97..d7c40f7643 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -202,7 +202,6 @@ abstract class AbstractJMSProcessor extends AbstractProcess JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory(this.cachingConnectionFactory); this.destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); - jmsTemplate.setDefaultDestinationName(this.destinationName); jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); // set of properties that may be good candidates for exposure via configuration diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index 83e594a3c3..cdd5fcd4b7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -78,7 +78,8 @@ public class ConsumeJMS extends AbstractJMSProcessor { */ @Override protected void rendezvousWithJms(ProcessContext context, ProcessSession processSession) throws ProcessException { - final JMSResponse response = this.targetResource.consume(); + final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); + final JMSResponse response = this.targetResource.consume(destinationName); if (response != null){ FlowFile flowFile = processSession.create(); flowFile = processSession.write(flowFile, new OutputStreamCallback() { @@ -88,7 +89,9 @@ public class ConsumeJMS extends AbstractJMSProcessor { } }); Map jmsHeaders = response.getMessageHeaders(); - flowFile = this.updateFlowFileAttributesWithJmsHeaders(jmsHeaders, flowFile, processSession); + Map jmsProperties = Collections.unmodifiableMap(response.getMessageProperties()); + flowFile = this.updateFlowFileAttributesWithMap(jmsHeaders, flowFile, processSession); + flowFile = this.updateFlowFileAttributesWithMap(jmsProperties, flowFile, processSession); processSession.getProvenanceReporter().receive(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); processSession.transfer(flowFile, REL_SUCCESS); } else { @@ -115,10 +118,10 @@ public class ConsumeJMS extends AbstractJMSProcessor { /** * */ - private FlowFile updateFlowFileAttributesWithJmsHeaders(Map jmsHeaders, FlowFile flowFile, ProcessSession processSession) { + private FlowFile updateFlowFileAttributesWithMap(Map map, FlowFile flowFile, ProcessSession processSession) { Map attributes = new HashMap(); - for (Entry headersEntry : jmsHeaders.entrySet()) { - attributes.put(headersEntry.getKey(), String.valueOf(headersEntry.getValue())); + for (Entry entry : map.entrySet()) { + attributes.put(entry.getKey(), String.valueOf(entry.getValue())); } attributes.put(JMS_SOURCE_DESTINATION_NAME, this.destinationName); flowFile = processSession.putAllAttributes(flowFile, attributes); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index d88b3488d4..1525738ca1 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -61,8 +61,8 @@ final class JMSConsumer extends JMSWorker { /** * */ - public JMSResponse consume() { - Message message = this.jmsTemplate.receive(); + public JMSResponse consume(final String destinationName) { + Message message = this.jmsTemplate.receive(destinationName); if (message != null) { byte[] messageBody = null; try { diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java index cc9ee7f8ac..49355f4573 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSPublisher.java @@ -63,8 +63,8 @@ final class JMSPublisher extends JMSWorker { * * @param messageBytes byte array representing contents of the message */ - void publish(byte[] messageBytes) { - this.publish(messageBytes, null); + void publish(final String destinationName, byte[] messageBytes) { + this.publish(destinationName, messageBytes, null); } /** @@ -74,8 +74,8 @@ final class JMSPublisher extends JMSWorker { * @param flowFileAttributes * Map representing {@link FlowFile} attributes. */ - void publish(final byte[] messageBytes, final Map flowFileAttributes) { - this.jmsTemplate.send(new MessageCreator() { + void publish(final String destinationName, final byte[] messageBytes, final Map flowFileAttributes) { + this.jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { BytesMessage message = session.createBytesMessage(); @@ -83,7 +83,7 @@ final class JMSPublisher extends JMSWorker { if (flowFileAttributes != null && !flowFileAttributes.isEmpty()) { // set message headers and properties for (Entry entry : flowFileAttributes.entrySet()) { - if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-")) {// '-' is illegal char in JMS prop names + if (!entry.getKey().startsWith(JmsHeaders.PREFIX) && !entry.getKey().contains("-") && !entry.getKey().contains(".")) {// '-' and '.' are illegal char in JMS prop names message.setStringProperty(entry.getKey(), entry.getValue()); } else if (entry.getKey().equals(JmsHeaders.DELIVERY_MODE)) { message.setJMSDeliveryMode(Integer.parseInt(entry.getValue())); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java index 0802b439d4..36f7e868bb 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java @@ -98,7 +98,8 @@ public class PublishJMS extends AbstractJMSProcessor { FlowFile flowFile = processSession.get(); if (flowFile != null) { try { - this.targetResource.publish(this.extractMessageBody(flowFile, processSession), + final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions(flowFile).getValue(); + this.targetResource.publish(destinationName, this.extractMessageBody(flowFile, processSession), flowFile.getAttributes()); processSession.transfer(flowFile, REL_SUCCESS); processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue()); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java index 6f0cf3937b..8a69a1487f 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java @@ -48,13 +48,12 @@ public class CommonTest { assertTrue(consumeJmsPresent); } - static JmsTemplate buildJmsTemplateForDestination(String destinationName, boolean pubSub) { + static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost?broker.persistent=false"); CachingConnectionFactory cf = new CachingConnectionFactory(connectionFactory); JmsTemplate jmsTemplate = new JmsTemplate(cf); - jmsTemplate.setDefaultDestinationName(destinationName); jmsTemplate.setPubSubDomain(pubSub); return jmsTemplate; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java index 57d7dda3e7..e9364d28f8 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSTest.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.jms.processors; +import java.util.HashMap; +import java.util.Map; + import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.util.MockFlowFile; @@ -26,7 +29,6 @@ import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.JmsHeaders; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -35,9 +37,13 @@ public class ConsumeJMSTest { @Test public void validateSuccessfulConsumeAndTransferToSuccess() throws Exception { - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("cooQueue", false); + final String destinationName = "cooQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); JMSPublisher sender = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - sender.publish("Hey dude!".getBytes()); + final Map senderAttributes = new HashMap<>(); + senderAttributes.put("filename", "message.txt"); + senderAttributes.put("attribute_from_sender", "some value"); + sender.publish(destinationName, "Hey dude!".getBytes(), senderAttributes); TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); when(cs.getIdentifier()).thenReturn("cfProvider"); @@ -46,13 +52,18 @@ public class ConsumeJMSTest { runner.enableControllerService(cs); runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(ConsumeJMS.DESTINATION, "cooQueue"); + runner.setProperty(ConsumeJMS.DESTINATION, destinationName); runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); runner.run(1, false); // final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); assertNotNull(successFF); - assertEquals("cooQueue", successFF.getAttributes().get(JmsHeaders.DESTINATION)); + successFF.assertAttributeExists(JmsHeaders.DESTINATION); + successFF.assertAttributeEquals(JmsHeaders.DESTINATION, destinationName); + successFF.assertAttributeExists("filename"); + successFF.assertAttributeEquals("filename", "message.txt"); + successFF.assertAttributeExists("attribute_from_sender"); + successFF.assertAttributeEquals("attribute_from_sender", "some value"); successFF.assertContentEquals("Hey dude!".getBytes()); String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); assertNotNull(sourceDestination); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java index 1aa528899d..be097fe41d 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java @@ -42,12 +42,13 @@ public class JMSPublisherConsumerTest { @Test public void validateByesConvertedToBytesMessageOnSend() throws Exception { - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + final String destinationName = "testQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); - publisher.publish("hellomq".getBytes()); + publisher.publish(destinationName, "hellomq".getBytes()); - Message receivedMessage = jmsTemplate.receive(); + Message receivedMessage = jmsTemplate.receive(destinationName); assertTrue(receivedMessage instanceof BytesMessage); byte[] bytes = new byte[7]; ((BytesMessage) receivedMessage).readBytes(bytes); @@ -58,15 +59,16 @@ public class JMSPublisherConsumerTest { @Test public void validateJmsHeadersAndPropertiesAreTransferredFromFFAttributes() throws Exception { - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + final String destinationName = "testQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); JMSPublisher publisher = new JMSPublisher(jmsTemplate, mock(ComponentLog.class)); Map flowFileAttributes = new HashMap<>(); flowFileAttributes.put("foo", "foo"); flowFileAttributes.put(JmsHeaders.REPLY_TO, "myTopic"); - publisher.publish("hellomq".getBytes(), flowFileAttributes); + publisher.publish(destinationName, "hellomq".getBytes(), flowFileAttributes); - Message receivedMessage = jmsTemplate.receive(); + Message receivedMessage = jmsTemplate.receive(destinationName); assertTrue(receivedMessage instanceof BytesMessage); assertEquals("foo", receivedMessage.getStringProperty("foo")); assertTrue(receivedMessage.getJMSReplyTo() instanceof Topic); @@ -83,9 +85,10 @@ public class JMSPublisherConsumerTest { */ @Test(expected = IllegalStateException.class) public void validateFailOnUnsupportedMessageType() throws Exception { - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + final String destinationName = "testQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - jmsTemplate.send(new MessageCreator() { + jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(); @@ -94,7 +97,7 @@ public class JMSPublisherConsumerTest { JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); try { - consumer.consume(); + consumer.consume(destinationName); } finally { ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } @@ -102,9 +105,10 @@ public class JMSPublisherConsumerTest { @Test public void validateConsumeWithCustomHeadersAndProperties() throws Exception { - JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination("testQueue", false); + final String destinationName = "testQueue"; + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); - jmsTemplate.send(new MessageCreator() { + jmsTemplate.send(destinationName, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage("hello from the other side"); @@ -116,9 +120,7 @@ public class JMSPublisherConsumerTest { }); JMSConsumer consumer = new JMSConsumer(jmsTemplate, mock(ComponentLog.class)); - assertEquals("JMSConsumer[destination:testQueue; pub-sub:false;]", consumer.toString()); - - JMSResponse response = consumer.consume(); + JMSResponse response = consumer.consume(destinationName); assertEquals("hello from the other side", new String(response.getMessageBody())); assertEquals("fooQueue", response.getMessageHeaders().get(JmsHeaders.REPLY_TO)); assertEquals("foo", response.getMessageProperties().get("foo")); diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java index 36edf79a64..f7ccf17ee7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/PublishJMSTest.java @@ -43,6 +43,7 @@ public class PublishJMSTest { public void validateSuccessfulPublishAndTransferToSuccess() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + final String destinationName = "fooQueue"; PublishJMS pubProc = new PublishJMS(); TestRunner runner = TestRunners.newTestRunner(pubProc); JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); @@ -53,7 +54,7 @@ public class PublishJMSTest { runner.enableControllerService(cs); runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); - runner.setProperty(PublishJMS.DESTINATION, "fooQueue"); + runner.setProperty(PublishJMS.DESTINATION, destinationName); Map attributes = new HashMap<>(); attributes.put("foo", "foo"); @@ -65,8 +66,43 @@ public class PublishJMSTest { assertNotNull(successFF); JmsTemplate jmst = new JmsTemplate(cf); - jmst.setDefaultDestinationName("fooQueue"); - BytesMessage message = (BytesMessage) jmst.receive(); + BytesMessage message = (BytesMessage) jmst.receive(destinationName); + + byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); + assertEquals("Hey dude!", new String(messageBytes)); + assertEquals("cooQueue", ((Queue) message.getJMSReplyTo()).getQueueName()); + assertEquals("foo", message.getStringProperty("foo")); + } + + @Test + public void validateSuccessfulPublishAndTransferToSuccessWithEL() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + final String destinationNameExpression = "${foo}Queue"; + final String destinationName = "fooQueue"; + PublishJMS pubProc = new PublishJMS(); + TestRunner runner = TestRunners.newTestRunner(pubProc); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(cf); + + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(PublishJMS.DESTINATION, destinationNameExpression); + + Map attributes = new HashMap<>(); + attributes.put("foo", "foo"); + attributes.put(JmsHeaders.REPLY_TO, "cooQueue"); + runner.enqueue("Hey dude!".getBytes(), attributes); + runner.run(1, false); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + JmsTemplate jmst = new JmsTemplate(cf); + BytesMessage message = (BytesMessage) jmst.receive(destinationName); byte[] messageBytes = MessageBodyToBytesConverter.toBytes(message); assertEquals("Hey dude!", new String(messageBytes));