NIFI-7015 - ConsumeJMS now supports ObjectMessage, MapMessage and StreamMessage types as well. Added optional ERROR_QUEUE property. Result flowfiles get a 'jms.messagetype' attribute that contains the incoming message type (TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage).

This commit is contained in:
Tamas Palfy 2020-01-14 18:48:50 +01:00 committed by markap14
parent 421bfdd5ff
commit 103325354b
7 changed files with 704 additions and 17 deletions

View File

@ -179,6 +179,8 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
try {
rendezvousWithJms(context, session, worker);
} catch (Exception e) {
getLogger().error("Error while trying to process JMS message", e);
} finally {
//in case of exception during worker's connection (consumer or publisher),
//an appropriate service is responsible to invalidate the worker.

View File

@ -25,6 +25,8 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
@ -41,6 +43,7 @@ import org.springframework.jms.support.JmsHeaders;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -59,8 +62,9 @@ import java.util.concurrent.TimeUnit;
*/
@Tags({ "jms", "get", "message", "receive", "consume" })
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@CapabilityDescription("Consumes JMS Message of type BytesMessage or TextMessage transforming its content to "
+ "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes.")
@CapabilityDescription("Consumes JMS Message of type BytesMessage, TextMessage, ObjectMessage, MapMessage or StreamMessage transforming its content to "
+ "a FlowFile and transitioning it to 'success' relationship. JMS attributes such as headers and properties will be copied as FlowFile attributes. "
+ "MapMessages will be transformed into JSONs and then into byte arrays. The other types will have their raw contents as byte array transferred into the flowfile.")
@WritesAttributes({
@WritesAttribute(attribute = JmsHeaders.DELIVERY_MODE, description = "The JMSDeliveryMode from the message header."),
@WritesAttribute(attribute = JmsHeaders.EXPIRATION, description = "The JMSExpiration from the message header."),
@ -72,10 +76,12 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = JmsHeaders.TYPE, description = "The JMSType from the message header."),
@WritesAttribute(attribute = JmsHeaders.REPLY_TO, description = "The JMSReplyTo from the message header."),
@WritesAttribute(attribute = JmsHeaders.DESTINATION, description = "The JMSDestination from the message header."),
@WritesAttribute(attribute = ConsumeJMS.JMS_MESSAGETYPE, description = "The JMS message type, can be TextMessage, BytesMessage, ObjectMessage, MapMessage or StreamMessage)."),
@WritesAttribute(attribute = "other attributes", description = "Each message property is written to an attribute.")
})
@SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
public static final String JMS_MESSAGETYPE = "jms.messagetype";
static final AllowableValue AUTO_ACK = new AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
"AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + ")",
@ -137,6 +143,14 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.defaultValue("1 sec")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor ERROR_QUEUE = new PropertyDescriptor.Builder()
.name("Error Queue Name")
.description("The name of a JMS Queue where - if set - unprocessed messages will be routed. Usually provided by the administrator (e.g., 'queue://myErrorQueue' or 'myErrorQueue')." +
"Only applicable if 'Destination Type' is set to 'QUEUE'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -166,6 +180,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
_propertyDescriptors.add(SHARED_SUBSCRIBER);
_propertyDescriptors.add(SUBSCRIPTION_NAME);
_propertyDescriptors.add(TIMEOUT);
_propertyDescriptors.add(ERROR_QUEUE);
thisPropertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>();
@ -173,6 +188,25 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
relationships = Collections.unmodifiableSet(_relationships);
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
String destinationType = validationContext.getProperty(DESTINATION_TYPE).getValue();
String errorQueue = validationContext.getProperty(ERROR_QUEUE).getValue();
if (errorQueue != null && !QUEUE.equals(destinationType)) {
validationResults.add(new ValidationResult.Builder()
.valid(false)
.subject(ERROR_QUEUE.getDisplayName())
.explanation("'" + ERROR_QUEUE.getDisplayName() + "' is applicable only when " +
"'" + DESTINATION_TYPE.getDisplayName() + "'='" + QUEUE + "'")
.build());
}
return validationResults;
}
/**
* Will construct a {@link FlowFile} containing the body of the consumed JMS
* message (if {@link JMSResponse} returned by {@link JMSConsumer} is not
@ -183,6 +217,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
@Override
protected void rendezvousWithJms(final ProcessContext context, final ProcessSession processSession, final JMSConsumer consumer) throws ProcessException {
final String destinationName = context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
final String errorQueueName = context.getProperty(ERROR_QUEUE).evaluateAttributeExpressions().getValue();
final Boolean durableBoolean = context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
final boolean durable = durableBoolean == null ? false : durableBoolean;
final Boolean sharedBoolean = context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
@ -191,7 +226,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
final String charset = context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
try {
consumer.consume(destinationName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
consumer.consume(destinationName, errorQueueName, durable, shared, subscriptionName, charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
@ -209,6 +244,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
flowFile = processSession.putAttribute(flowFile, JMS_SOURCE_DESTINATION_NAME, destinationName);
processSession.getProvenanceReporter().receive(flowFile, destinationName);
processSession.putAttribute(flowFile, JMS_MESSAGETYPE, response.getMessageType());
processSession.transfer(flowFile, REL_SUCCESS);
processSession.commit();
}

View File

@ -25,10 +25,13 @@ import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
@ -80,7 +83,7 @@ final class JMSConsumer extends JMSWorker {
}
public void consume(final String destinationName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
public void consume(final String destinationName, String errorQueueName, final boolean durable, final boolean shared, final String subscriberName, final String charset,
final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
@ -92,28 +95,52 @@ final class JMSConsumer extends JMSWorker {
JMSResponse response = null;
if (message != null) {
byte[] messageBody = null;
String messageType;
byte[] messageBody;
try {
if (message instanceof TextMessage) {
messageType = TextMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((TextMessage) message, Charset.forName(charset));
} else if (message instanceof BytesMessage) {
messageType = BytesMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((BytesMessage) message);
} else if (message instanceof ObjectMessage) {
messageType = ObjectMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((ObjectMessage) message);
} else if (message instanceof StreamMessage) {
messageType = StreamMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((StreamMessage) message);
} else if (message instanceof MapMessage) {
messageType = MapMessage.class.getSimpleName();
messageBody = MessageBodyToBytesConverter.toBytes((MapMessage) message);
} else {
processLog.error("Received a JMS Message that was neither a TextMessage nor a BytesMessage [{}]; will skip this message.", new Object[] {message});
acknowledge(message, session);
if (errorQueueName != null) {
processLog.error("Received unsupported JMS Message type [{}]; rerouting message to error queue [{}].", new Object[] {message, errorQueueName});
jmsTemplate.send(errorQueueName, __ -> message);
} else {
processLog.error("Received unsupported JMS Message type [{}]; will skip this message.", new Object[] {message});
}
return null;
}
} catch (final MessageConversionException mce) {
processLog.error("Received a JMS Message [{}] but failed to obtain the content of the message; will acknowledge this message without creating a FlowFile for it.",
new Object[] {message}, mce);
acknowledge(message, session);
if (errorQueueName != null) {
jmsTemplate.send(errorQueueName, __ -> message);
}
return null;
}
final Map<String, String> messageHeaders = extractMessageHeaders(message);
final Map<String, String> messageProperties = extractMessageProperties(message);
response = new JMSResponse(messageBody, messageHeaders, messageProperties);
response = new JMSResponse(messageType, messageBody, messageHeaders, messageProperties);
}
// invoke the processor callback (regardless if it's null,
@ -208,15 +235,21 @@ final class JMSConsumer extends JMSWorker {
static class JMSResponse {
private final byte[] messageBody;
private final String messageType;
private final Map<String, String> messageHeaders;
private final Map<String, String> messageProperties;
JMSResponse(byte[] messageBody, Map<String, String> messageHeaders, Map<String, String> messageProperties) {
JMSResponse(String messageType, byte[] messageBody, Map<String, String> messageHeaders, Map<String, String> messageProperties) {
this.messageType = messageType;
this.messageBody = messageBody;
this.messageHeaders = Collections.unmodifiableMap(messageHeaders);
this.messageProperties = Collections.unmodifiableMap(messageProperties);
}
public String getMessageType() {
return messageType;
}
public byte[] getMessageBody() {
return this.messageBody;
}

View File

@ -16,15 +16,28 @@
*/
package org.apache.nifi.jms.processors;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageEOFException;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
/**
*
@ -57,7 +70,7 @@ abstract class MessageBodyToBytesConverter {
return message.getText().getBytes(charset);
}
} catch (JMSException e) {
throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e);
throw new MessageConversionException("Failed to convert " + TextMessage.class.getSimpleName() + " to byte[]", e);
}
}
@ -71,7 +84,111 @@ abstract class MessageBodyToBytesConverter {
InputStream is = new BytesMessageInputStream(message);
return IOUtils.toByteArray(is);
} catch (Exception e) {
throw new MessageConversionException("Failed to convert BytesMessage to byte[]", e);
throw new MessageConversionException("Failed to convert " + BytesMessage.class.getSimpleName() + " to byte[]", e);
}
}
/**
*
* @param message instance of {@link ObjectMessage}
* @return byte array representing the {@link ObjectMessage}
*/
public static byte[] toBytes(ObjectMessage message) {
try {
return SerializationUtils.serialize(message.getObject());
} catch (Exception e) {
throw new MessageConversionException("Failed to convert " + ObjectMessage.class.getSimpleName() + " to byte[]", e);
}
}
/**
* @param message instance of {@link StreamMessage}
* @return byte array representing the {@link StreamMessage}
*/
public static byte[] toBytes(StreamMessage message) {
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
) {
while (true) {
try {
Object element = message.readObject();
if (element instanceof Boolean) {
dataOutputStream.writeBoolean((Boolean) element);
} else if (element instanceof byte[]) {
dataOutputStream.write((byte[]) element);
} else if (element instanceof Byte) {
dataOutputStream.writeByte((Byte) element);
} else if (element instanceof Short) {
dataOutputStream.writeShort((Short) element);
} else if (element instanceof Integer) {
dataOutputStream.writeInt((Integer) element);
} else if (element instanceof Long) {
dataOutputStream.writeLong((Long) element);
} else if (element instanceof Float) {
dataOutputStream.writeFloat((Float) element);
} else if (element instanceof Double) {
dataOutputStream.writeDouble((Double) element);
} else if (element instanceof Character) {
dataOutputStream.writeChar((Character) element);
} else if (element instanceof String) {
dataOutputStream.writeUTF((String) element);
} else {
throw new MessageConversionException("Unsupported type in " + StreamMessage.class.getSimpleName() + ": '" + element.getClass() + "'");
}
} catch (MessageEOFException mEofE) {
break;
}
}
dataOutputStream.flush();
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
} catch (Exception e) {
throw new MessageConversionException("Failed to convert " + StreamMessage.class.getSimpleName() + " to byte[]", e);
}
}
/**
* @param message instance of {@link MapMessage}
* @return byte array representing the {@link MapMessage}
*/
public static byte[] toBytes(MapMessage message) {
ObjectMapper objectMapper = new ObjectMapper();
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
) {
Map<String, Object> objectMap = new HashMap<>();
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements()) {
String name = (String) mapNames.nextElement();
Object value = message.getObject(name);
if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
List<Byte> byteList = new ArrayList<>(bytes.length);
for (byte aByte : bytes) {
byteList.add(aByte);
}
objectMap.put(name, byteList);
} else {
objectMap.put(name, value);
}
}
objectMapper.writeValue(byteArrayOutputStream, objectMap);
byte[] jsonAsByteArray = byteArrayOutputStream.toByteArray();
return jsonAsByteArray;
} catch (JMSException e) {
throw new MessageConversionException("Couldn't read incoming " + MapMessage.class.getSimpleName(), e);
} catch (IOException e) {
throw new MessageConversionException("Couldn't transform incoming " + MapMessage.class.getSimpleName() + " to JSON", e);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -23,6 +24,8 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile;
@ -31,8 +34,18 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.JmsHeaders;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public class ConsumeJMSIT {
@Test
@ -55,6 +68,7 @@ public class ConsumeJMSIT {
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
runner.run(1, false);
//
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
@ -65,6 +79,8 @@ public class ConsumeJMSIT {
successFF.assertAttributeEquals("filename", "message.txt");
successFF.assertAttributeExists("attribute_from_sender");
successFF.assertAttributeEquals("attribute_from_sender", "some value");
successFF.assertAttributeExists("jms.messagetype");
successFF.assertAttributeEquals("jms.messagetype", "BytesMessage");
successFF.assertContentEquals("Hey dude!".getBytes());
String sourceDestination = successFF.getAttribute(ConsumeJMS.JMS_SOURCE_DESTINATION_NAME);
assertNotNull(sourceDestination);
@ -72,4 +88,174 @@ public class ConsumeJMSIT {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsSet() throws Exception {
testValidateErrorQueue(ConsumeJMS.TOPIC, "errorQueue", false);
}
@Test
public void testValidateErrorQueueWhenDestinationIsTopicAndErrorQueueIsNotSet() throws Exception {
testValidateErrorQueue(ConsumeJMS.TOPIC, null, true);
}
@Test
public void testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsSet() throws Exception {
testValidateErrorQueue(ConsumeJMS.QUEUE, "errorQueue", true);
}
@Test
public void testValidateErrorQueueWhenDestinationIsQueueAndErrorQueueIsNotSet() throws Exception {
testValidateErrorQueue(ConsumeJMS.QUEUE, null, true);
}
private void testValidateErrorQueue(String destinationType, String errorQueue, boolean expectedValid) throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cfService = mock(JMSConnectionFactoryProviderDefinition.class);
when(cfService.getIdentifier()).thenReturn("cfService");
when(cfService.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
runner.addControllerService("cfService", cfService);
runner.enableControllerService(cfService);
runner.setProperty(PublishJMS.CF_SERVICE, "cfService");
runner.setProperty(ConsumeJMS.DESTINATION, "destination");
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, destinationType);
if (errorQueue != null) {
runner.setProperty(ConsumeJMS.ERROR_QUEUE, errorQueue);
}
if (expectedValid) {
runner.assertValid();
} else {
runner.assertNotValid();
}
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void testTextMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testTextMessage",
Session::createTextMessage,
TextMessage.class.getSimpleName()
);
}
@Test
public void testByteMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testByteMessage",
Session::createBytesMessage,
BytesMessage.class.getSimpleName()
);
}
@Test
public void testObjectMessageTypeAttribute() throws Exception {
String destinationName = "testObjectMessage";
testMessageTypeAttribute(
destinationName,
Session::createObjectMessage,
ObjectMessage.class.getSimpleName()
);
}
@Test
public void testStreamMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testStreamMessage",
Session::createStreamMessage,
StreamMessage.class.getSimpleName()
);
}
@Test
public void testMapMessageTypeAttribute() throws Exception {
testMessageTypeAttribute(
"testMapMessage",
Session::createMapMessage,
MapMessage.class.getSimpleName()
);
}
@Test
public void testUnsupportedMessage() throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
sender.jmsTemplate.send("testMapMessage", __ -> createUnsupportedMessage(
"unsupportedMessagePropertyKey",
"unsupportedMessagePropertyValue"
));
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, "testMapMessage");
runner.setProperty(ConsumeJMS.ERROR_QUEUE, "errorQueue");
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
runner.run(1, false);
JmsTemplate jmst = new JmsTemplate(cf);
Message message = jmst.receive("errorQueue");
assertNotNull(message);
assertEquals(message.getStringProperty("unsupportedMessagePropertyKey"), "unsupportedMessagePropertyValue");
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
private void testMessageTypeAttribute(String destinationName, final MessageCreator messageCreator, String expectedJmsMessageTypeAttribute) throws Exception {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
try {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
sender.jmsTemplate.send(destinationName, messageCreator);
TestRunner runner = TestRunners.newTestRunner(new ConsumeJMS());
JMSConnectionFactoryProviderDefinition cs = mock(JMSConnectionFactoryProviderDefinition.class);
when(cs.getIdentifier()).thenReturn("cfProvider");
when(cs.getConnectionFactory()).thenReturn(jmsTemplate.getConnectionFactory());
runner.addControllerService("cfProvider", cs);
runner.enableControllerService(cs);
runner.setProperty(PublishJMS.CF_SERVICE, "cfProvider");
runner.setProperty(ConsumeJMS.DESTINATION, destinationName);
runner.setProperty(ConsumeJMS.DESTINATION_TYPE, ConsumeJMS.QUEUE);
runner.run(1, false);
//
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PublishJMS.REL_SUCCESS).get(0);
assertNotNull(successFF);
successFF.assertAttributeExists(ConsumeJMS.JMS_MESSAGETYPE);
successFF.assertAttributeEquals(ConsumeJMS.JMS_MESSAGETYPE, expectedJmsMessageTypeAttribute);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
public ActiveMQMessage createUnsupportedMessage(String propertyKey, String propertyValue) throws JMSException {
ActiveMQMessage message = new ActiveMQMessage();
message.setStringProperty(propertyKey, propertyValue);
return message;
}
}

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.processors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.nifi.logging.ComponentLog;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import static org.mockito.Mockito.mock;
@Ignore("Used for manual testing.")
public class ConsumeJMSManualTest {
@Test
public void testTextMessage() throws Exception {
MessageCreator messageCreator = session -> {
TextMessage message = session.createTextMessage("textMessageContent");
return message;
};
send(messageCreator);
}
@Test
public void testBytesMessage() throws Exception {
MessageCreator messageCreator = session -> {
BytesMessage message = session.createBytesMessage();
message.writeBytes("bytesMessageContent".getBytes());
return message;
};
send(messageCreator);
}
@Test
public void testObjectMessage() throws Exception {
MessageCreator messageCreator = session -> {
ObjectMessage message = session.createObjectMessage();
message.setObject("stringAsObject");
return message;
};
send(messageCreator);
}
@Test
public void testStreamMessage() throws Exception {
MessageCreator messageCreator = session -> {
StreamMessage message = session.createStreamMessage();
message.writeBoolean(true);
message.writeByte(Integer.valueOf(1).byteValue());
message.writeBytes(new byte[] {2, 3, 4});
message.writeShort((short)32);
message.writeInt(64);
message.writeLong(128L);
message.writeFloat(1.25F);
message.writeDouble(100.867);
message.writeChar('c');
message.writeString("someString");
message.writeObject("stringAsObject");
return message;
};
send(messageCreator);
}
@Test
public void testMapMessage() throws Exception {
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
send(messageCreator);
}
@Test
public void testUnsupportedMessage() throws Exception {
MessageCreator messageCreator = session -> new ActiveMQMessage();
send(messageCreator);
}
private void send(MessageCreator messageCreator) throws Exception {
final String destinationName = "TEST";
ConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
final ConnectionFactory connectionFactory = new CachingConnectionFactory(activeMqConnectionFactory);
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setPubSubDomain(false);
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
jmsTemplate.setReceiveTimeout(10L);
try {
JMSPublisher sender = new JMSPublisher((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
sender.jmsTemplate.send(destinationName, messageCreator);
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
}

View File

@ -16,10 +16,14 @@
*/
package org.apache.nifi.jms.processors;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
@ -29,11 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
import org.apache.nifi.logging.ComponentLog;
@ -45,6 +55,160 @@ import org.springframework.jms.support.JmsHeaders;
public class JMSPublisherConsumerIT {
@Test
public void testObjectMessage() throws Exception {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
ObjectMessage message = session.createObjectMessage();
message.setObject("stringAsObject");
return message;
};
ConsumerCallback responseChecker = response -> {
assertEquals(
"stringAsObject",
SerializationUtils.deserialize(response.getMessageBody())
);
};
testMessage(destinationName, messageCreator, responseChecker);
}
@Test
public void testStreamMessage() throws Exception {
final String destinationName = "testStreamMessage";
MessageCreator messageCreator = session -> {
StreamMessage message = session.createStreamMessage();
message.writeBoolean(true);
message.writeByte(Integer.valueOf(1).byteValue());
message.writeBytes(new byte[] {2, 3, 4});
message.writeShort((short)32);
message.writeInt(64);
message.writeLong(128L);
message.writeFloat(1.25F);
message.writeDouble(100.867);
message.writeChar('c');
message.writeString("someString");
message.writeObject("stringAsObject");
return message;
};
byte[] expected;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
) {
dataOutputStream.writeBoolean(true);
dataOutputStream.writeByte(1);
dataOutputStream.write(new byte[] {2, 3, 4});
dataOutputStream.writeShort((short)32);
dataOutputStream.writeInt(64);
dataOutputStream.writeLong(128L);
dataOutputStream.writeFloat(1.25F);
dataOutputStream.writeDouble(100.867);
dataOutputStream.writeChar('c');
dataOutputStream.writeUTF("someString");
dataOutputStream.writeUTF("stringAsObject");
dataOutputStream.flush();
expected = byteArrayOutputStream.toByteArray();
}
ConsumerCallback responseChecker = response -> {
byte[] actual = response.getMessageBody();
assertArrayEquals(
expected,
actual
);
};
testMessage(destinationName, messageCreator, responseChecker);
}
@Test
public void testMapMessage() throws Exception {
final String destinationName = "testObjectMessage";
MessageCreator messageCreator = session -> {
MapMessage message = session.createMapMessage();
message.setBoolean("boolean", true);
message.setByte("byte", Integer.valueOf(1).byteValue());
message.setBytes("bytes", new byte[] {2, 3, 4});
message.setShort("short", (short)32);
message.setInt("int", 64);
message.setLong("long", 128L);
message.setFloat("float", 1.25F);
message.setDouble("double", 100.867);
message.setChar("char", 'c');
message.setString("string", "someString");
message.setObject("object", "stringAsObject");
return message;
};
String expectedJson = "{" +
"\"boolean\":true," +
"\"byte\":1," +
"\"bytes\":[2, 3, 4]," +
"\"short\":32," +
"\"int\":64," +
"\"long\":128," +
"\"float\":1.25," +
"\"double\":100.867," +
"\"char\":\"c\"," +
"\"string\":\"someString\"," +
"\"object\":\"stringAsObject\"" +
"}";
testMapMessage(destinationName, messageCreator, expectedJson);
}
private void testMapMessage(String destinationName, MessageCreator messageCreator, String expectedJson) {
ConsumerCallback responseChecker = response -> {
ObjectMapper objectMapper = new ObjectMapper();
try {
Map<String, Object> actual = objectMapper.readValue(response.getMessageBody(), new TypeReference<Map<String, Object>>() {});
Map<String, Object> expected = objectMapper.readValue(expectedJson.getBytes(), new TypeReference<Map<String, Object>>() {});
assertEquals(expected, actual);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
testMessage(destinationName, messageCreator, responseChecker);
}
private void testMessage(String destinationName, MessageCreator messageCreator, ConsumerCallback responseChecker) {
JmsTemplate jmsTemplate = CommonTest.buildJmsTemplateForDestination(false);
AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
jmsTemplate.send(destinationName, messageCreator);
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, null, false, false, null, "UTF-8", response -> {
callbackInvoked.set(true);
responseChecker.accept(response);
});
assertTrue(callbackInvoked.get());
} finally {
((CachingConnectionFactory) jmsTemplate.getConnectionFactory()).destroy();
}
}
@Test
public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
final String destinationName = "validateBytesConvertedToBytesMessageOnSend";
@ -116,7 +280,7 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
@ -146,7 +310,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -193,7 +357,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000; j++) {
consumer.consume(destinationName, false, false, null, "UTF-8", callback);
consumer.consume(destinationName, null, false, false, null, "UTF-8", callback);
}
} finally {
((CachingConnectionFactory) consumeTemplate.getConnectionFactory()).destroy();
@ -232,7 +396,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@ -249,7 +413,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -268,7 +432,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@ -290,7 +454,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it successfully
try {
while (!callbackInvoked.get()) {
consumer.consume(destinationName, false, false, null, "UTF-8", new ConsumerCallback() {
consumer.consume(destinationName, null, false, false, null, "UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {