diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 2d939446dd..cc4023b878 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -152,7 +152,7 @@ language governing permissions and limitations under the License. --> org.apache.activemq - activemq-client + activemq-all com.jayway.jsonpath diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java index 461d3816b9..d4e1969712 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JmsConsumer.java @@ -21,6 +21,7 @@ import static org.apache.nifi.processors.standard.util.JmsProperties.ACK_MODE_CL import static org.apache.nifi.processors.standard.util.JmsProperties.BATCH_SIZE; import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX; import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_NAME; +import static org.apache.nifi.processors.standard.util.JmsProperties.DESTINATION_TYPE; import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROPS_TO_ATTRIBUTES; import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; import static org.apache.nifi.processors.standard.util.JmsProperties.MESSAGE_SELECTOR; @@ -89,6 +90,7 @@ public abstract class JmsConsumer extends AbstractProcessor { descriptors.add(USERNAME); descriptors.add(PASSWORD); descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(DESTINATION_TYPE); descriptors.add(ACKNOWLEDGEMENT_MODE); descriptors.add(MESSAGE_SELECTOR); descriptors.add(JMS_PROPS_TO_ATTRIBUTES); @@ -158,8 +160,8 @@ public abstract class JmsConsumer extends AbstractProcessor { stopWatch.stop(); if (processingSummary.getFlowFilesCreated() > 0) { - final float secs = ((float) stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); - float messagesPerSec = ((float) processingSummary.getMessagesReceived()) / secs; + final float secs = (stopWatch.getDuration(TimeUnit.MILLISECONDS) / 1000F); + float messagesPerSec = (processingSummary.getMessagesReceived()) / secs; final String dataRate = stopWatch.calculateDataRate(processingSummary.getBytesReceived()); logger.info("Received {} messages in {} milliseconds, at a rate of {} messages/sec or {}", new Object[]{processingSummary.getMessagesReceived(), stopWatch.getDuration(TimeUnit.MILLISECONDS), messagesPerSec, dataRate}); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java index ca5df9f4eb..5f6bea514c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JmsFactory.java @@ -481,7 +481,13 @@ public class JmsFactory { attributes.put(ATTRIBUTE_PREFIX + JMS_CORRELATION_ID, message.getJMSCorrelationID()); } if (message.getJMSDestination() != null) { - attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, message.getJMSDestination().toString()); + String destinationName; + if (message.getJMSDestination() instanceof Queue) { + destinationName = ((Queue) message.getJMSDestination()).getQueueName(); + } else { + destinationName = ((Topic) message.getJMSDestination()).getTopicName(); + } + attributes.put(ATTRIBUTE_PREFIX + JMS_DESTINATION, destinationName); } if (message.getJMSMessageID() != null) { attributes.put(ATTRIBUTE_PREFIX + JMS_MESSAGE_ID, message.getJMSMessageID()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java index 9c833f54c9..dde1158473 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGetJMSQueue.java @@ -16,6 +16,11 @@ */ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + import javax.jms.BytesMessage; import javax.jms.MapMessage; import javax.jms.Message; @@ -24,61 +29,88 @@ import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; +import org.apache.nifi.processor.Relationship; import org.apache.nifi.processors.standard.util.JmsFactory; import org.apache.nifi.processors.standard.util.JmsProperties; import org.apache.nifi.processors.standard.util.WrappedMessageProducer; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.StandardProcessorTestRunner; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.apache.nifi.web.Revision; +import org.junit.Test; public class TestGetJMSQueue { - @org.junit.Ignore + @Test public void testSendTextToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); + WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); - final Message message = jmsSession.createTextMessage("Hello World"); producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello World"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); producer.close(); jmsSession.close(); } - @org.junit.Ignore + @Test public void testSendBytesToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); WrappedMessageProducer wrappedProducer = JmsFactory.createMessageProducer(runner.getProcessContext(), true); final Session jmsSession = wrappedProducer.getSession(); final MessageProducer producer = wrappedProducer.getProducer(); - final BytesMessage message = jmsSession.createBytesMessage(); message.writeBytes("Hello Bytes".getBytes()); producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello Bytes"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); producer.close(); jmsSession.close(); } - @org.junit.Ignore + @Test public void testSendStreamToQueue() throws Exception { - final TestRunner runner = TestRunners.newTestRunner(GetJMSQueue.class); + GetJMSQueue getJmsQueue = new GetJMSQueue(); + TestRunner runner = TestRunners.newTestRunner(getJmsQueue); runner.setProperty(JmsProperties.JMS_PROVIDER, JmsProperties.ACTIVEMQ_PROVIDER); - runner.setProperty(JmsProperties.URL, "tcp://localhost:61616"); + runner.setProperty(JmsProperties.URL, "vm://localhost?broker.persistent=false"); runner.setProperty(JmsProperties.DESTINATION_TYPE, JmsProperties.DESTINATION_TYPE_QUEUE); runner.setProperty(JmsProperties.DESTINATION_NAME, "queue.testing"); runner.setProperty(JmsProperties.ACKNOWLEDGEMENT_MODE, JmsProperties.ACK_MODE_AUTO); @@ -91,6 +123,17 @@ public class TestGetJMSQueue { producer.send(message); jmsSession.commit(); + + runner.run(); + + List flowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("success").build()); + + assertTrue(flowFiles.size() == 1); + MockFlowFile successFlowFile = flowFiles.get(0); + successFlowFile.assertContentEquals("Hello Stream"); + successFlowFile.assertAttributeEquals("jms.JMSDestination", "queue.testing"); + producer.close(); jmsSession.close(); } diff --git a/pom.xml b/pom.xml index d563408282..2ab8c94a1f 100644 --- a/pom.xml +++ b/pom.xml @@ -511,8 +511,8 @@ org.apache.activemq - activemq-client - 5.12.0 + activemq-all + 5.12.1 org.apache.lucene