diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java index e7a29d350e..82ed075596 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java @@ -179,6 +179,8 @@ abstract class AbstractJMSProcessor extends AbstractProcess try { rendezvousWithJms(context, session, worker); + } catch (Exception e) { + getLogger().error("Error while trying to process JMS message", e); } finally { //in case of exception during worker's connection (consumer or publisher), //an appropriate service is responsible to invalidate the worker. diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java index eb4597cd10..fac995c989 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java @@ -25,6 +25,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; @@ -41,6 +43,7 @@ import org.springframework.jms.support.JmsHeaders; import javax.jms.Session; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -59,8 +62,9 @@ import java.util.concurrent.TimeUnit; */ @Tags({ "jms", "get", "message", "receive", "consume" }) @InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to " - + "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes.") +@CapabilityDescription("Consumes JMS Message of type BytesMessage, TextMessage, ObjectMessage, MapMessage or StreamMessage transforming its content to " + + "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes. " + + "MapMessages will be transformed into JSONs and then into byte arrays. The other types will have their raw contents as byte array transferred into the flowfile.") @WritesAttributes({ @WritesAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "The JMSDeliveryMode from the message header."), @WritesAttribute(attribute = JmsHeaders.EXPIRATION, description = "The JMSExpiration from the message header."), @@ -72,10 +76,12 @@ import java.util.concurrent.TimeUnit; @WritesAttribute(attribute = JmsHeaders.TYPE, description = "The JMSType from the message header."), @WritesAttribute(attribute = JmsHeaders.REPLY_TO, description = "The JMSReplyTo from the message header."), @WritesAttribute(attribute = JmsHeaders.DESTINATION, description = "The JMSDestination from the message header."), + @WritesAttribute(attribute = ConsumeJMS.JMS_MESSAGETYPE, description = "The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."), @WritesAttribute(attribute = "other attributes", description = "Each message property is written to an attribute.") }) @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class }) public class ConsumeJMS extends AbstractJMSProcessor { + public static final String JMS_MESSAGETYPE = "jms.messagetype"; static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE), "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")", @@ -137,6 +143,14 @@ public class ConsumeJMS extends AbstractJMSProcessor { .defaultValue("1 sec") .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + static final PropertyDescriptor ERROR_QUEUE = new PropertyDescriptor.Builder() + .name("Error Queue Name") + .description("The name of a JMS Queue where - if set - unprocessed messages will be routed. Usually provided by the administrator (e.g., 'queue://myErrorQueue' or 'myErrorQueue')." + + "Only applicable if 'Destination Type' is set to 'QUEUE'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -166,6 +180,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { _propertyDescriptors.add(SHARED_SUBSCRIBER); _propertyDescriptors.add(SUBSCRIPTION_NAME); _propertyDescriptors.add(TIMEOUT); + _propertyDescriptors.add(ERROR_QUEUE); thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -173,6 +188,25 @@ public class ConsumeJMS extends AbstractJMSProcessor { relationships = Collections.unmodifiableSet(_relationships); } + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List validationResults = new ArrayList<>(super.customValidate(validationContext)); + + String destinationType = validationContext.getProperty(DESTINATION_TYPE).getValue(); + String errorQueue = validationContext.getProperty(ERROR_QUEUE).getValue(); + + if (errorQueue != null && !QUEUE.equals(destinationType)) { + validationResults.add(new ValidationResult.Builder() + .valid(false) + .subject(ERROR_QUEUE.getDisplayName()) + .explanation("'" + ERROR_QUEUE.getDisplayName() + "' is applicable only when " + + "'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE + "'") + .build()); + } + + return validationResults; + } + /** * Will construct a {@link FlowFile} containing the body of the consumed JMS * message (if {@link JMSResponse} returned by {@link JMSConsumer} is not @@ -183,6 +217,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { @Override protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException { final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue(); + final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue(); final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); final boolean durable = durableBoolean == null ? false : durableBoolean; final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean(); @@ -191,7 +226,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue(); try { - consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() { + consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() { @Override public void accept(final JMSResponse response) { if (response == null) { @@ -209,6 +244,7 @@ public class ConsumeJMS extends AbstractJMSProcessor { flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName); processSession.getProvenanceReporter().receive(flowFile, destinationName); + processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType()); processSession.transfer(flowFile, REL_SUCCESS); processSession.commit(); } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java index 3c16ee26eb..ac9fbce7f5 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java @@ -25,10 +25,13 @@ import java.util.Map; import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; @@ -80,7 +83,7 @@ final class JMSConsumer extends JMSWorker { } - public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset, + public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriberName, final String charset, final ConsumerCallback consumerCallback) { this.jmsTemplate.execute(new SessionCallback() { @Override @@ -92,28 +95,52 @@ final class JMSConsumer extends JMSWorker { JMSResponse response = null; if (message != null) { - byte[] messageBody = null; + String messageType; + byte[] messageBody; try { if (message instanceof TextMessage) { + messageType = TextMessage.class.getSimpleName(); messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset)); } else if (message instanceof BytesMessage) { + messageType = BytesMessage.class.getSimpleName(); messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message); + } else if (message instanceof ObjectMessage) { + messageType = ObjectMessage.class.getSimpleName(); + messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message); + } else if (message instanceof StreamMessage) { + messageType = StreamMessage.class.getSimpleName(); + messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message); + } else if (message instanceof MapMessage) { + messageType = MapMessage.class.getSimpleName(); + messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message); } else { - processLog.error("Received a JMS Message that was neither a TextMessage nor a BytesMessage [{}]; will skip this message.", new Object[] {message}); acknowledge(message, session); + + if (errorQueueName != null) { + processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName}); + jmsTemplate.send(errorQueueName, __ -> message); + } else { + processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message}); + } + return null; } } catch (final MessageConversionException mce) { processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.", new Object[] {message}, mce); acknowledge(message, session); + + if (errorQueueName != null) { + jmsTemplate.send(errorQueueName, __ -> message); + } + return null; } final Map messageHeaders = extractMessageHeaders(message); final Map messageProperties = extractMessageProperties(message); - response = new JMSResponse(messageBody, messageHeaders, messageProperties); + response = new JMSResponse(messageType, messageBody, messageHeaders, messageProperties); } // invoke the processor callback (regardless if it's null, @@ -208,15 +235,21 @@ final class JMSConsumer extends JMSWorker { static class JMSResponse { private final byte[] messageBody; + private final String messageType; private final Map messageHeaders; private final Map messageProperties; - JMSResponse(byte[] messageBody, Map messageHeaders, Map messageProperties) { + JMSResponse(String messageType, byte[] messageBody, Map messageHeaders, Map messageProperties) { + this.messageType = messageType; this.messageBody = messageBody; this.messageHeaders = Collections.unmodifiableMap(messageHeaders); this.messageProperties = Collections.unmodifiableMap(messageProperties); } + public String getMessageType() { + return messageType; + } + public byte[] getMessageBody() { return this.messageBody; } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java index 7dd77e48eb..c8cd9adbc9 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/MessageBodyToBytesConverter.java @@ -16,15 +16,28 @@ */ package org.apache.nifi.jms.processors; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageEOFException; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; import javax.jms.TextMessage; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.SerializationUtils; /** * @@ -57,7 +70,7 @@ abstract class MessageBodyToBytesConverter { return message.getText().getBytes(charset); } } catch (JMSException e) { - throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e); + throw new MessageConversionException("Failed to convert " + TextMessage.class.getSimpleName() + " to byte[]", e); } } @@ -71,7 +84,111 @@ abstract class MessageBodyToBytesConverter { InputStream is = new BytesMessageInputStream(message); return IOUtils.toByteArray(is); } catch (Exception e) { - throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e); + throw new MessageConversionException("Failed to convert " + BytesMessage.class.getSimpleName() + " to byte[]", e); + } + } + + /** + * + * @param message instance of {@link ObjectMessage} + * @return byte array representing the {@link ObjectMessage} + */ + public static byte[] toBytes(ObjectMessage message) { + try { + return SerializationUtils.serialize(message.getObject()); + } catch (Exception e) { + throw new MessageConversionException("Failed to convert " + ObjectMessage.class.getSimpleName() + " to byte[]", e); + } + } + + /** + * @param message instance of {@link StreamMessage} + * @return byte array representing the {@link StreamMessage} + */ + public static byte[] toBytes(StreamMessage message) { + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + ) { + while (true) { + try { + Object element = message.readObject(); + if (element instanceof Boolean) { + dataOutputStream.writeBoolean((Boolean) element); + } else if (element instanceof byte[]) { + dataOutputStream.write((byte[]) element); + } else if (element instanceof Byte) { + dataOutputStream.writeByte((Byte) element); + } else if (element instanceof Short) { + dataOutputStream.writeShort((Short) element); + } else if (element instanceof Integer) { + dataOutputStream.writeInt((Integer) element); + } else if (element instanceof Long) { + dataOutputStream.writeLong((Long) element); + } else if (element instanceof Float) { + dataOutputStream.writeFloat((Float) element); + } else if (element instanceof Double) { + dataOutputStream.writeDouble((Double) element); + } else if (element instanceof Character) { + dataOutputStream.writeChar((Character) element); + } else if (element instanceof String) { + dataOutputStream.writeUTF((String) element); + } else { + throw new MessageConversionException("Unsupported type in " + StreamMessage.class.getSimpleName() + ": '" + element.getClass() + "'"); + } + } catch (MessageEOFException mEofE) { + break; + } + } + + dataOutputStream.flush(); + + byte[] bytes = byteArrayOutputStream.toByteArray(); + + return bytes; + } catch (Exception e) { + throw new MessageConversionException("Failed to convert " + StreamMessage.class.getSimpleName() + " to byte[]", e); + } + } + + /** + * @param message instance of {@link MapMessage} + * @return byte array representing the {@link MapMessage} + */ + public static byte[] toBytes(MapMessage message) { + ObjectMapper objectMapper = new ObjectMapper(); + + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + ) { + Map objectMap = new HashMap<>(); + + Enumeration mapNames = message.getMapNames(); + + while (mapNames.hasMoreElements()) { + String name = (String) mapNames.nextElement(); + Object value = message.getObject(name); + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + List byteList = new ArrayList<>(bytes.length); + for (byte aByte : bytes) { + byteList.add(aByte); + } + objectMap.put(name, byteList); + } else { + objectMap.put(name, value); + } + } + + objectMapper.writeValue(byteArrayOutputStream, objectMap); + + byte[] jsonAsByteArray = byteArrayOutputStream.toByteArray(); + + return jsonAsByteArray; + } catch (JMSException e) { + throw new MessageConversionException("Couldn't read incoming " + MapMessage.class.getSimpleName(), e); + } catch (IOException e) { + throw new MessageConversionException("Couldn't transform incoming " + MapMessage.class.getSimpleName() + " to JSON", e); } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java index d3012e92de..f40f60c6b7 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.jms.processors; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -23,6 +24,8 @@ import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.util.MockFlowFile; @@ -31,8 +34,18 @@ import org.apache.nifi.util.TestRunners; import org.junit.Test; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; import org.springframework.jms.support.JmsHeaders; +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + public class ConsumeJMSIT { @Test @@ -55,6 +68,7 @@ public class ConsumeJMSIT { runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); runner.setProperty(ConsumeJMS.DESTINATION, destinationName); runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); // final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); @@ -65,6 +79,8 @@ public class ConsumeJMSIT { successFF.assertAttributeEquals("filename", "message.txt"); successFF.assertAttributeExists("attribute_from_sender"); successFF.assertAttributeEquals("attribute_from_sender", "some value"); + successFF.assertAttributeExists("jms.messagetype"); + successFF.assertAttributeEquals("jms.messagetype", "BytesMessage"); successFF.assertContentEquals("Hey dude!".getBytes()); String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME); assertNotNull(sourceDestination); @@ -72,4 +88,174 @@ public class ConsumeJMSIT { ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); } } + + @Test + public void testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsSet() throws Exception { + testValidateErrorQueue(ConsumeJMS.TOPIC, "errorQueue", false); + } + + @Test + public void testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsNotSet() throws Exception { + testValidateErrorQueue(ConsumeJMS.TOPIC, null, true); + } + + @Test + public void testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsSet() throws Exception { + testValidateErrorQueue(ConsumeJMS.QUEUE, "errorQueue", true); + } + + @Test + public void testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsNotSet() throws Exception { + testValidateErrorQueue(ConsumeJMS.QUEUE, null, true); + } + + private void testValidateErrorQueue(String destinationType, String errorQueue, boolean expectedValid) throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + + try { + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + + JMSConnectionFactoryProviderDefinition cfService = mock(JMSConnectionFactoryProviderDefinition.class); + when(cfService.getIdentifier()).thenReturn("cfService"); + when(cfService.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + + runner.addControllerService("cfService", cfService); + runner.enableControllerService(cfService); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfService"); + runner.setProperty(ConsumeJMS.DESTINATION, "destination"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, destinationType); + if (errorQueue != null) { + runner.setProperty(ConsumeJMS.ERROR_QUEUE, errorQueue); + } + + if (expectedValid) { + runner.assertValid(); + } else { + runner.assertNotValid(); + } + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } + + @Test + public void testTextMessageTypeAttribute() throws Exception { + testMessageTypeAttribute( + "testTextMessage", + Session::createTextMessage, + TextMessage.class.getSimpleName() + ); + } + + @Test + public void testByteMessageTypeAttribute() throws Exception { + testMessageTypeAttribute( + "testByteMessage", + Session::createBytesMessage, + BytesMessage.class.getSimpleName() + ); + } + + @Test + public void testObjectMessageTypeAttribute() throws Exception { + String destinationName = "testObjectMessage"; + + testMessageTypeAttribute( + destinationName, + Session::createObjectMessage, + ObjectMessage.class.getSimpleName() + ); + } + + @Test + public void testStreamMessageTypeAttribute() throws Exception { + testMessageTypeAttribute( + "testStreamMessage", + Session::createStreamMessage, + StreamMessage.class.getSimpleName() + ); + } + + @Test + public void testMapMessageTypeAttribute() throws Exception { + testMessageTypeAttribute( + "testMapMessage", + Session::createMapMessage, + MapMessage.class.getSimpleName() + ); + } + + @Test + public void testUnsupportedMessage() throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + + JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + + sender.jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage( + "unsupportedMessagePropertyKey", + "unsupportedMessagePropertyValue" + )); + + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, "testMapMessage"); + runner.setProperty(ConsumeJMS.ERROR_QUEUE, "errorQueue"); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); + + JmsTemplate jmst = new JmsTemplate(cf); + Message message = jmst.receive("errorQueue"); + + assertNotNull(message); + assertEquals(message.getStringProperty("unsupportedMessagePropertyKey"), "unsupportedMessagePropertyValue"); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } + + private void testMessageTypeAttribute(String destinationName, final MessageCreator messageCreator, String expectedJmsMessageTypeAttribute) throws Exception { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + try { + JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + + sender.jmsTemplate.send(destinationName, messageCreator); + + TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS()); + JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class); + when(cs.getIdentifier()).thenReturn("cfProvider"); + when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory()); + runner.addControllerService("cfProvider", cs); + runner.enableControllerService(cs); + + runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider"); + runner.setProperty(ConsumeJMS.DESTINATION, destinationName); + runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE); + runner.run(1, false); + // + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0); + assertNotNull(successFF); + + successFF.assertAttributeExists(ConsumeJMS.JMS_MESSAGETYPE); + successFF.assertAttributeEquals(ConsumeJMS.JMS_MESSAGETYPE, expectedJmsMessageTypeAttribute); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } + + public ActiveMQMessage createUnsupportedMessage(String propertyKey, String propertyValue) throws JMSException { + ActiveMQMessage message = new ActiveMQMessage(); + + message.setStringProperty(propertyKey, propertyValue); + + return message; + } } diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java new file mode 100644 index 0000000000..061f18cdb8 --- /dev/null +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSManualTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.jms.processors; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.nifi.logging.ComponentLog; +import org.junit.Ignore; +import org.junit.Test; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.MessageCreator; + +import javax.jms.BytesMessage; +import javax.jms.ConnectionFactory; +import javax.jms.MapMessage; +import javax.jms.ObjectMessage; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import static org.mockito.Mockito.mock; + +@Ignore("Used for manual testing.") +public class ConsumeJMSManualTest { + @Test + public void testTextMessage() throws Exception { + MessageCreator messageCreator = session -> { + TextMessage message = session.createTextMessage("textMessageContent"); + + return message; + }; + + send(messageCreator); + } + + @Test + public void testBytesMessage() throws Exception { + MessageCreator messageCreator = session -> { + BytesMessage message = session.createBytesMessage(); + + message.writeBytes("bytesMessageContent".getBytes()); + + return message; + }; + + send(messageCreator); + } + + @Test + public void testObjectMessage() throws Exception { + MessageCreator messageCreator = session -> { + ObjectMessage message = session.createObjectMessage(); + + message.setObject("stringAsObject"); + + return message; + }; + + send(messageCreator); + } + + @Test + public void testStreamMessage() throws Exception { + MessageCreator messageCreator = session -> { + StreamMessage message = session.createStreamMessage(); + + message.writeBoolean(true); + message.writeByte(Integer.valueOf(1).byteValue()); + message.writeBytes(new byte[] {2, 3, 4}); + message.writeShort((short)32); + message.writeInt(64); + message.writeLong(128L); + message.writeFloat(1.25F); + message.writeDouble(100.867); + message.writeChar('c'); + message.writeString("someString"); + message.writeObject("stringAsObject"); + + return message; + }; + + send(messageCreator); + } + + @Test + public void testMapMessage() throws Exception { + MessageCreator messageCreator = session -> { + MapMessage message = session.createMapMessage(); + + message.setBoolean("boolean", true); + message.setByte("byte", Integer.valueOf(1).byteValue()); + message.setBytes("bytes", new byte[] {2, 3, 4}); + message.setShort("short", (short)32); + message.setInt("int", 64); + message.setLong("long", 128L); + message.setFloat("float", 1.25F); + message.setDouble("double", 100.867); + message.setChar("char", 'c'); + message.setString("string", "someString"); + message.setObject("object", "stringAsObject"); + + return message; + }; + + send(messageCreator); + } + + @Test + public void testUnsupportedMessage() throws Exception { + MessageCreator messageCreator = session -> new ActiveMQMessage(); + + send(messageCreator); + } + + private void send(MessageCreator messageCreator) throws Exception { + final String destinationName = "TEST"; + + ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory); + + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + jmsTemplate.setPubSubDomain(false); + jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); + jmsTemplate.setReceiveTimeout(10L); + + try { + JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + + sender.jmsTemplate.send(destinationName, messageCreator); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } +} diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java index bc480a2056..11dc94a47b 100644 --- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java +++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java @@ -16,10 +16,14 @@ */ package org.apache.nifi.jms.processors; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -29,11 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.BytesMessage; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.SerializationUtils; import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback; import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse; import org.apache.nifi.logging.ComponentLog; @@ -45,6 +55,160 @@ import org.springframework.jms.support.JmsHeaders; public class JMSPublisherConsumerIT { + @Test + public void testObjectMessage() throws Exception { + final String destinationName = "testObjectMessage"; + + MessageCreator messageCreator = session -> { + ObjectMessage message = session.createObjectMessage(); + + message.setObject("stringAsObject"); + + return message; + }; + + ConsumerCallback responseChecker = response -> { + assertEquals( + "stringAsObject", + SerializationUtils.deserialize(response.getMessageBody()) + ); + }; + + testMessage(destinationName, messageCreator, responseChecker); + } + + @Test + public void testStreamMessage() throws Exception { + final String destinationName = "testStreamMessage"; + + MessageCreator messageCreator = session -> { + StreamMessage message = session.createStreamMessage(); + + message.writeBoolean(true); + message.writeByte(Integer.valueOf(1).byteValue()); + message.writeBytes(new byte[] {2, 3, 4}); + message.writeShort((short)32); + message.writeInt(64); + message.writeLong(128L); + message.writeFloat(1.25F); + message.writeDouble(100.867); + message.writeChar('c'); + message.writeString("someString"); + message.writeObject("stringAsObject"); + + return message; + }; + + byte[] expected; + try ( + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + ) { + dataOutputStream.writeBoolean(true); + dataOutputStream.writeByte(1); + dataOutputStream.write(new byte[] {2, 3, 4}); + dataOutputStream.writeShort((short)32); + dataOutputStream.writeInt(64); + dataOutputStream.writeLong(128L); + dataOutputStream.writeFloat(1.25F); + dataOutputStream.writeDouble(100.867); + dataOutputStream.writeChar('c'); + dataOutputStream.writeUTF("someString"); + dataOutputStream.writeUTF("stringAsObject"); + + dataOutputStream.flush(); + + expected = byteArrayOutputStream.toByteArray(); + } + + ConsumerCallback responseChecker = response -> { + byte[] actual = response.getMessageBody(); + + assertArrayEquals( + expected, + actual + ); + }; + + testMessage(destinationName, messageCreator, responseChecker); + } + + @Test + public void testMapMessage() throws Exception { + final String destinationName = "testObjectMessage"; + + MessageCreator messageCreator = session -> { + MapMessage message = session.createMapMessage(); + + message.setBoolean("boolean", true); + message.setByte("byte", Integer.valueOf(1).byteValue()); + message.setBytes("bytes", new byte[] {2, 3, 4}); + message.setShort("short", (short)32); + message.setInt("int", 64); + message.setLong("long", 128L); + message.setFloat("float", 1.25F); + message.setDouble("double", 100.867); + message.setChar("char", 'c'); + message.setString("string", "someString"); + message.setObject("object", "stringAsObject"); + + return message; + }; + + String expectedJson = "{" + + "\"boolean\":true," + + "\"byte\":1," + + "\"bytes\":[2, 3, 4]," + + "\"short\":32," + + "\"int\":64," + + "\"long\":128," + + "\"float\":1.25," + + "\"double\":100.867," + + "\"char\":\"c\"," + + "\"string\":\"someString\"," + + "\"object\":\"stringAsObject\"" + + "}"; + + testMapMessage(destinationName, messageCreator, expectedJson); + } + + private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) { + ConsumerCallback responseChecker = response -> { + ObjectMapper objectMapper = new ObjectMapper(); + + try { + Map actual = objectMapper.readValue(response.getMessageBody(), new TypeReference>() {}); + Map expected = objectMapper.readValue(expectedJson.getBytes(), new TypeReference>() {}); + + assertEquals(expected, actual); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + testMessage(destinationName, messageCreator, responseChecker); + } + + private void testMessage(String destinationName, MessageCreator messageCreator, ConsumerCallback responseChecker) { + JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false); + + AtomicBoolean callbackInvoked = new AtomicBoolean(); + + try { + jmsTemplate.send(destinationName, messageCreator); + + JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); + consumer.consume(destinationName, null, false, false, null, "UTF-8", response -> { + callbackInvoked.set(true); + responseChecker.accept(response); + }); + + assertTrue(callbackInvoked.get()); + } finally { + ((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy(); + } + } + @Test public void validateBytesConvertedToBytesMessageOnSend() throws Exception { final String destinationName = "validateBytesConvertedToBytesMessageOnSend"; @@ -116,7 +280,7 @@ public class JMSPublisherConsumerIT { }); JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { // noop @@ -146,7 +310,7 @@ public class JMSPublisherConsumerIT { JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); final AtomicBoolean callbackInvoked = new AtomicBoolean(); - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -193,7 +357,7 @@ public class JMSPublisherConsumerIT { JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class)); for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) { - consumer.consume(destinationName, false, false, null, "UTF-8", callback); + consumer.consume(destinationName, null, false, false, null, "UTF-8", callback); } } finally { ((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy(); @@ -232,7 +396,7 @@ public class JMSPublisherConsumerIT { JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class)); final AtomicBoolean callbackInvoked = new AtomicBoolean(); try { - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { callbackInvoked.set(true); @@ -249,7 +413,7 @@ public class JMSPublisherConsumerIT { // should receive the same message, but will process it successfully while (!callbackInvoked.get()) { - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { if (response == null) { @@ -268,7 +432,7 @@ public class JMSPublisherConsumerIT { // receiving next message and fail again try { while (!callbackInvoked.get()) { - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { if (response == null) { @@ -290,7 +454,7 @@ public class JMSPublisherConsumerIT { // should receive the same message, but will process it successfully try { while (!callbackInvoked.get()) { - consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() { + consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() { @Override public void accept(JMSResponse response) { if (response == null) {