Refactor transformer to better map AMQP messages to JMS message types
and better preserve the original encoding of stored messages so that
they can be sent back to an AMQP client with expected content types.
Adds additional interoperability tests.
This commit is contained in:
Timothy Bish 2016-07-25 18:15:53 -04:00
parent 3953b9aaef
commit d54e21b2ff
21 changed files with 3023 additions and 357 deletions

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -20,7 +20,7 @@ import javax.jms.Message;
public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
public AMQPNativeInboundTransformer(JMSVendor vendor) {
public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
}
@ -35,12 +35,13 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
Message rc = super.transform(amqpMessage);
Message result = super.doTransform(amqpMessage);
populateMessage(rc, amqp);
return rc;
populateMessage(result, amqp);
return result;
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -32,7 +32,7 @@ import org.apache.qpid.proton.message.ProtonJMessage;
public class AMQPNativeOutboundTransformer extends OutboundTransformer {
public AMQPNativeOutboundTransformer(JMSVendor vendor) {
public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -22,7 +22,7 @@ import javax.jms.Message;
public class AMQPRawInboundTransformer extends InboundTransformer {
public AMQPRawInboundTransformer(JMSVendor vendor) {
public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
}
@ -33,28 +33,27 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
@Override
public InboundTransformer getFallbackTransformer() {
return null; // No fallback from full raw transform
return null; // No fallback from full raw transform, message likely dropped.
}
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
BytesMessage rc = vendor.createBytesMessage();
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
protected Message doTransform(EncodedMessage amqpMessage) throws Exception {
BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
// We cannot decode the message headers to check so err on the side of caution
// and mark all messages as persistent.
rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
rc.setJMSPriority(defaultPriority);
result.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
result.setJMSPriority(defaultPriority);
final long now = System.currentTimeMillis();
rc.setJMSTimestamp(now);
result.setJMSTimestamp(now);
if (defaultTtl > 0) {
rc.setJMSExpiration(now + defaultTtl);
result.setJMSExpiration(now + defaultTtl);
}
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
result.setBooleanProperty(prefixVendor + "NATIVE", true);
return rc;
return result;
}
}

View File

@ -16,10 +16,18 @@
*/
package org.apache.activemq.transport.amqp.message;
import java.io.DataInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.zip.InflaterInputStream;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageNotWriteableException;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
@ -31,71 +39,357 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
public class ActiveMQJMSVendor implements JMSVendor {
public class ActiveMQJMSVendor {
final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
private ActiveMQJMSVendor() {
}
@Override
public BytesMessage createBytesMessage() {
return new ActiveMQBytesMessage();
}
@Override
public StreamMessage createStreamMessage() {
return new ActiveMQStreamMessage();
}
@Override
/**
* @return a new vendor specific Message instance.
*/
public Message createMessage() {
return new ActiveMQMessage();
}
@Override
/**
* @return a new vendor specific BytesMessage instance.
*/
public BytesMessage createBytesMessage() {
return new ActiveMQBytesMessage();
}
/**
* @return a new vendor specific BytesMessage instance with the given payload.
*/
public BytesMessage createBytesMessage(byte[] content, int offset, int length) {
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
message.setContent(new ByteSequence(content, offset, length));
return message;
}
/**
* @return a new vendor specific StreamMessage instance.
*/
public StreamMessage createStreamMessage() {
return new ActiveMQStreamMessage();
}
/**
* @return a new vendor specific TextMessage instance.
*/
public TextMessage createTextMessage() {
return new ActiveMQTextMessage();
}
@Override
/**
* @return a new vendor specific TextMessage instance with the given string in the body.
*/
public TextMessage createTextMessage(String text) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
try {
message.setText(text);
} catch (MessageNotWriteableException ex) {}
return message;
}
/**
* @return a new vendor specific ObjectMessage instance.
*/
public ObjectMessage createObjectMessage() {
return new ActiveMQObjectMessage();
}
@Override
/**
* @return a new vendor specific ObjectMessage instance with the serialized form given.
*/
public ObjectMessage createObjectMessage(byte[] content, int offset, int length) {
ActiveMQObjectMessage message = new ActiveMQObjectMessage();
message.setContent(new ByteSequence(content, offset, length));
return message;
}
/**
* @return a new vendor specific MapMessage instance.
*/
public MapMessage createMapMessage() {
return new ActiveMQMapMessage();
}
@Override
/**
* @return a new vendor specific MapMessage instance with the given map as its content.
*/
public MapMessage createMapMessage(Map<String, Object> content) throws JMSException {
ActiveMQMapMessage message = new ActiveMQMapMessage();
final Set<Map.Entry<String, Object>> set = content.entrySet();
for (Map.Entry<String, Object> entry : set) {
message.setObject(entry.getKey(), entry.getValue());
}
return message;
}
/**
* Creates a new JMS Destination instance from the given name.
*
* @param name
* the name to use to construct the new Destination
*
* @return a new JMS Destination object derived from the given name.
*/
public Destination createDestination(String name) {
return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
}
@Override
/**
* Set the given value as the JMSXUserID on the message instance.
*
* @param message
* the message to be updated.
* @param value
* the value to apply to the message.
*/
public void setJMSXUserID(Message msg, String value) {
((ActiveMQMessage) msg).setUserID(value);
}
@Override
/**
* Set the given value as the JMSXGroupID on the message instance.
*
* @param message
* the message to be updated.
* @param value
* the value to apply to the message.
*/
public void setJMSXGroupID(Message msg, String value) {
((ActiveMQMessage) msg).setGroupID(value);
}
@Override
/**
* Set the given value as the JMSXGroupSequence on the message instance.
*
* @param message
* the message to be updated.
* @param value
* the value to apply to the message.
*/
public void setJMSXGroupSequence(Message msg, int value) {
((ActiveMQMessage) msg).setGroupSequence(value);
}
@Override
/**
* Set the given value as the JMSXDeliveryCount on the message instance.
*
* @param message
* the message to be updated.
* @param value
* the value to apply to the message.
*/
public void setJMSXDeliveryCount(Message msg, long value) {
((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
}
@Override
/**
* Convert the given JMS Destination into the appropriate AMQP address string
* for assignment to the 'to' or 'replyTo' field of an AMQP message.
*
* @param destination
* the JMS Destination instance to be converted.
*
* @return the converted string address to assign to the message.
*/
public String toAddress(Destination dest) {
return ((ActiveMQDestination) dest).getQualifiedName();
}
/**
* Given an Message instance return the original Message ID that was assigned the
* Message when it was first processed by the broker. For an AMQP message this
* should be the original value of the message's MessageId field with the correct
* type preserved.
*
* @param message
* the message which is being accessed.
*
* @return the original MessageId assigned to this Message instance.
*/
public Object getOriginalMessageId(Message message) {
Object result;
MessageId msgId = ((ActiveMQMessage)message).getMessageId();
if (msgId.getTextView() != null) {
try {
result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView());
} catch (AmqpProtocolException e) {
result = msgId.getTextView().toString();
}
} else {
result = msgId.toString();
}
return result;
}
/**
* Return the encoded form of the BytesMessage as an AMQP Binary instance.
*
* @param message
* the Message whose binary encoded body is needed.
*
* @return a Binary instance containing the encoded message body.
*
* @throws JMSException if an error occurs while fetching the binary payload.
*/
public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException {
ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message;
Binary result = null;
if (bytesMessage.getContent() != null) {
ByteSequence contents = bytesMessage.getContent();
if (bytesMessage.isCompressed()) {
int length = (int) bytesMessage.getBodyLength();
byte[] uncompressed = new byte[length];
bytesMessage.readBytes(uncompressed);
result = new Binary(uncompressed);
} else {
return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
}
}
return result;
}
/**
* Return the encoded form of the BytesMessage as an AMQP Binary instance.
*
* @param message
* the Message whose binary encoded body is needed.
*
* @return a Binary instance containing the encoded message body.
*
* @throws JMSException if an error occurs while fetching the binary payload.
*/
public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException {
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message;
Binary result = null;
if (objectMessage.getContent() != null) {
ByteSequence contents = objectMessage.getContent();
if (objectMessage.isCompressed()) {
try (ByteArrayOutputStream os = new ByteArrayOutputStream();
ByteArrayInputStream is = new ByteArrayInputStream(contents);
InflaterInputStream iis = new InflaterInputStream(is);) {
byte value;
while ((value = (byte) iis.read()) != -1) {
os.write(value);
}
ByteSequence expanded = os.toByteSequence();
result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength());
} catch (Exception cause) {
throw JMSExceptionSupport.create(cause);
}
} else {
return new Binary(contents.getData(), contents.getOffset(), contents.getLength());
}
}
return result;
}
/**
* Return the encoded form of the Message as an AMQP Binary instance.
*
* @param message
* the Message whose binary encoded body is needed.
*
* @return a Binary instance containing the encoded message body.
*
* @throws JMSException if an error occurs while fetching the binary payload.
*/
public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException {
ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message;
Binary result = null;
if (textMessage.getContent() != null) {
ByteSequence contents = textMessage.getContent();
if (textMessage.isCompressed()) {
try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
InflaterInputStream iis = new InflaterInputStream(is);
DataInputStream dis = new DataInputStream(iis);) {
int size = dis.readInt();
byte[] uncompressed = new byte[size];
dis.readFully(uncompressed);
result = new Binary(uncompressed);
} catch (Exception cause) {
throw JMSExceptionSupport.create(cause);
}
} else {
// Message includes a size prefix of four bytes for the OpenWire marshaler
result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4);
}
}
return result;
}
/**
* Return the underlying Map from the JMS MapMessage instance.
*
* @param message
* the MapMessage whose underlying Map is requested.
*
* @return the underlying Map used to store the value in the given MapMessage.
*
* @throws JMSException if an error occurs in constructing or fetching the Map.
*/
public Map<String, Object> getMapFromMessageBody(MapMessage message) throws JMSException {
final HashMap<String, Object> map = new HashMap<String, Object>();
final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message;
final Map<String, Object> contentMap = mapMessage.getContentMap();
if (contentMap != null) {
map.putAll(contentMap);
}
return contentMap;
}
/**
* Sets the given Message Property on the given message overriding any read-only
* state on the Message long enough for the property to be added.
*
* @param message
* the message to set the property on.
* @param key
* the String key for the new Message property
* @param value
* the Object to assign to the new Message property.
*
* @throws JMSException if an error occurs while setting the property.
*/
public void setMessageProperty(Message message, String key, Object value) throws JMSException {
final ActiveMQMessage amqMessage = (ActiveMQMessage) message;
boolean oldValue = amqMessage.isReadOnlyProperties();
amqMessage.setReadOnlyProperties(false);
amqMessage.setObjectProperty(key, value);
amqMessage.setReadOnlyProperties(oldValue);
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.StringTokenizer;
public final class AmqpContentTypeSupport {
private static final String UTF_8 = "UTF-8";
private static final String CHARSET = "charset";
private static final String TEXT = "text";
private static final String APPLICATION = "application";
private static final String JAVASCRIPT = "javascript";
private static final String XML = "xml";
private static final String XML_VARIANT = "+xml";
private static final String JSON = "json";
private static final String JSON_VARIANT = "+json";
private static final String XML_DTD = "xml-dtd";
private static final String ECMASCRIPT = "ecmascript";
/**
* @param contentType
* the contentType of the received message
* @return the character set to use, or null if not to treat the message as
* text
* @throws InvalidContentTypeException
* if the content-type is invalid in some way.
*/
public static Charset parseContentTypeForTextualCharset(final String contentType) throws InvalidContentTypeException {
if (contentType == null || contentType.trim().isEmpty()) {
throw new InvalidContentTypeException("Content type can't be null or empty");
}
int subTypeSeparator = contentType.indexOf("/");
if (subTypeSeparator == -1) {
throw new InvalidContentTypeException("Content type has no '/' separator: " + contentType);
}
final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
String parameterPart = null;
int parameterSeparator = subTypePart.indexOf(";");
if (parameterSeparator != -1) {
if (parameterSeparator < subTypePart.length() - 1) {
parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
}
subTypePart = subTypePart.substring(0, parameterSeparator).trim();
}
if (subTypePart.isEmpty()) {
throw new InvalidContentTypeException("Content type has no subtype after '/'" + contentType);
}
final String subType = subTypePart;
if (isTextual(type, subType)) {
String charset = findCharset(parameterPart);
if (charset == null) {
charset = UTF_8;
}
if (UTF_8.equals(charset)) {
return StandardCharsets.UTF_8;
} else {
try {
return Charset.forName(charset);
} catch (IllegalCharsetNameException icne) {
throw new InvalidContentTypeException("Illegal charset: " + charset);
} catch (UnsupportedCharsetException uce) {
throw new InvalidContentTypeException("Unsupported charset: " + charset);
}
}
}
return null;
}
//----- Internal Content Type utilities ----------------------------------//
private static boolean isTextual(String type, String subType) {
if (TEXT.equals(type)) {
return true;
}
if (APPLICATION.equals(type)) {
if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
|| XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
return true;
}
}
return false;
}
private static String findCharset(String paramaterPart) {
String charset = null;
if (paramaterPart != null) {
StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
while (tokenizer.hasMoreTokens()) {
String parameter = tokenizer.nextToken().trim();
int eqIndex = parameter.indexOf('=');
if (eqIndex != -1) {
String name = parameter.substring(0, eqIndex);
if (CHARSET.equalsIgnoreCase(name.trim())) {
String value = unquote(parameter.substring(eqIndex + 1));
charset = value.toUpperCase();
break;
}
}
}
}
return charset;
}
private static String unquote(String s) {
if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
return s.substring(1, s.length() - 1);
} else {
return s;
}
}
}

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.message.Message;
/**
* Support class containing constant values and static methods that are
* used to map to / from AMQP Message types being sent or received.
*/
public final class AmqpMessageSupport {
public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
public static final Data NULL_OBJECT_BODY;
public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING";
public static final short AMQP_UNKNOWN = 0;
public static final short AMQP_NULL = 1;
public static final short AMQP_DATA = 2;
public static final short AMQP_SEQUENCE = 3;
public static final short AMQP_VALUE_NULL = 4;
public static final short AMQP_VALUE_STRING = 5;
public static final short AMQP_VALUE_BINARY = 6;
public static final short AMQP_VALUE_MAP = 7;
public static final short AMQP_VALUE_LIST = 8;
static {
byte[] bytes;
try {
bytes = getSerializedBytes(null);
} catch (IOException e) {
throw new RuntimeException("Failed to initialise null object body", e);
}
NULL_OBJECT_BODY = new Data(new Binary(bytes));
}
/**
* Content type used to mark Data sections as containing a serialized java object.
*/
public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object";
/**
* Content type used to mark Data sections as containing arbitrary bytes.
*/
public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
/**
* Lookup and return the correct Proton Symbol instance based on the given key.
*
* @param key
* the String value name of the Symbol to locate.
*
* @return the Symbol value that matches the given key.
*/
public static Symbol getSymbol(String key) {
return Symbol.valueOf(key);
}
/**
* Safe way to access message annotations which will check internal structure and
* either return the annotation if it exists or null if the annotation or any annotations
* are present.
*
* @param key
* the String key to use to lookup an annotation.
* @param message
* the AMQP message object that is being examined.
*
* @return the given annotation value or null if not present in the message.
*/
public static Object getMessageAnnotation(String key, Message message) {
if (message != null && message.getMessageAnnotations() != null) {
Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
return annotations.get(AmqpMessageSupport.getSymbol(key));
}
return null;
}
/**
* Check whether the content-type field of the properties section (if present) in
* the given message matches the provided string (where null matches if there is
* no content type present.
*
* @param contentType
* content type string to compare against, or null if none
* @param message
* the AMQP message object that is being examined.
*
* @return true if content type matches
*/
public static boolean isContentType(String contentType, Message message) {
if (contentType == null) {
return message.getContentType() == null;
} else {
return contentType.equals(message.getContentType());
}
}
/**
* @param contentType the contentType of the received message
* @return the character set to use, or null if not to treat the message as text
*/
public static Charset getCharsetForTextualContent(String contentType) {
try {
return AmqpContentTypeSupport.parseContentTypeForTextualCharset(contentType);
} catch (InvalidContentTypeException e) {
return null;
}
}
private static byte[] getSerializedBytes(Serializable value) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(value);
oos.flush();
oos.close();
return baos.toByteArray();
}
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -23,18 +23,21 @@ public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
private final JMSMappingOutboundTransformer transformer;
public AutoOutboundTransformer(JMSVendor vendor) {
public AutoOutboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
transformer = new JMSMappingOutboundTransformer(vendor);
}
@Override
public EncodedMessage transform(Message msg) throws Exception {
if( msg == null )
if (msg == null) {
return null;
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
if( msg instanceof BytesMessage ) {
return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
}
if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
if (msg instanceof BytesMessage) {
return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg);
} else {
return null;
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -22,7 +22,7 @@ import org.apache.qpid.proton.message.Message;
public class EncodedMessage {
private final Binary data;
final long messageFormat;
private final long messageFormat;
public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
this.data = new Binary(data, offset, length);

View File

@ -24,6 +24,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Decimal128;
import org.apache.qpid.proton.amqp.Decimal32;
@ -41,31 +42,51 @@ import org.apache.qpid.proton.amqp.messaging.Properties;
public abstract class InboundTransformer {
JMSVendor vendor;
protected final ActiveMQJMSVendor vendor;
public static final String TRANSFORMER_NATIVE = "native";
public static final String TRANSFORMER_RAW = "raw";
public static final String TRANSFORMER_JMS = "jms";
String prefixVendor = "JMS_AMQP_";
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations = "MA_";
String prefixFooter = "FT_";
protected String prefixVendor = "JMS_AMQP_";
protected String prefixDeliveryAnnotations = "DA_";
protected String prefixMessageAnnotations = "MA_";
protected String prefixFooter = "FT_";
int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
public InboundTransformer(JMSVendor vendor) {
public InboundTransformer(ActiveMQJMSVendor vendor) {
this.vendor = vendor;
}
public abstract Message transform(EncodedMessage amqpMessage) throws Exception;
public abstract String getTransformerName();
public abstract InboundTransformer getFallbackTransformer();
public final Message transform(EncodedMessage amqpMessage) throws Exception {
InboundTransformer transformer = this;
Message message = null;
while (transformer != null) {
try {
message = transformer.doTransform(amqpMessage);
break;
} catch (Exception e) {
transformer = transformer.getFallbackTransformer();
}
}
if (message == null) {
throw new AmqpProtocolException("Failed to transform incoming delivery, skipping.", false);
}
return message;
}
protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception;
public int getDefaultDeliveryMode() {
return defaultDeliveryMode;
}
@ -98,14 +119,10 @@ public abstract class InboundTransformer {
this.prefixVendor = prefixVendor;
}
public JMSVendor getVendor() {
public ActiveMQJMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
@SuppressWarnings("unchecked")
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
public class InvalidContentTypeException extends Exception {
private static final long serialVersionUID = 1260362376856866687L;
public InvalidContentTypeException(String message) {
super(message);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,27 +16,41 @@
*/
package org.apache.activemq.transport.amqp.message;
import java.io.Serializable;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.isContentType;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.jms.BytesMessage;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.message.Message;
public class JMSMappingInboundTransformer extends InboundTransformer {
public JMSMappingInboundTransformer(JMSVendor vendor) {
public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
}
@ -50,70 +64,113 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
return new AMQPNativeInboundTransformer(getVendor());
}
@SuppressWarnings({ "unchecked" })
@Override
public Message transform(EncodedMessage amqpMessage) throws Exception {
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception {
Message amqp = amqpMessage.decode();
javax.jms.Message result = createMessage(amqp, amqpMessage);
result.setJMSDeliveryMode(defaultDeliveryMode);
result.setJMSPriority(defaultPriority);
result.setJMSExpiration(defaultTtl);
populateMessage(result, amqp);
result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
result.setBooleanProperty(prefixVendor + "NATIVE", false);
return result;
}
@SuppressWarnings({ "unchecked" })
private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception {
Section body = message.getBody();
javax.jms.Message result;
Message rc;
final Section body = amqp.getBody();
if (body == null) {
rc = vendor.createMessage();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
result = vendor.createObjectMessage();
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
result = vendor.createBytesMessage();
} else {
Charset charset = getCharsetForTextualContent(message.getContentType());
if (charset != null) {
result = vendor.createTextMessage();
} else {
result = vendor.createMessage();
}
}
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
} else if (body instanceof Data) {
Binary d = ((Data) body).getValue();
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
Binary payload = ((Data) body).getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
} else {
Charset charset = getCharsetForTextualContent(message.getContentType());
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = vendor.createTextMessage(String.valueOf(chars));
} catch (CharacterCodingException e) {
result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
}
} else {
result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
}
}
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
} else if (body instanceof AmqpSequence) {
AmqpSequence sequence = (AmqpSequence) body;
StreamMessage m = vendor.createStreamMessage();
for (Object item : sequence.getValue()) {
m.writeObject(item);
}
rc = m;
result = m;
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
} else if (body instanceof AmqpValue) {
Object value = ((AmqpValue) body).getValue();
if (value == null) {
rc = vendor.createObjectMessage();
}
if (value instanceof String) {
TextMessage m = vendor.createTextMessage();
m.setText((String) value);
rc = m;
if (value == null || value instanceof String) {
result = vendor.createTextMessage((String) value);
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary) {
Binary d = (Binary) value;
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
rc = m;
Binary payload = (Binary) value;
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
} else {
result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength());
}
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
StreamMessage m = vendor.createStreamMessage();
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
rc = m;
result = m;
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
MapMessage m = vendor.createMapMessage();
final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
for (Map.Entry<String, Object> entry : set) {
m.setObject(entry.getKey(), entry.getValue());
}
rc = m;
result = vendor.createMapMessage((Map<String, Object>) value);
result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP);
} else {
ObjectMessage m = vendor.createObjectMessage();
m.setObject((Serializable) value);
rc = m;
// Trigger fall-back to native encoder which generates BytesMessage with the
// original message stored in the message body.
throw new AmqpProtocolException("Unable to encode to ActiveMQ JMS Message", false);
}
} else {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
rc.setJMSExpiration(defaultTtl);
populateMessage(rc, amqp);
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
return rc;
return result;
}
}

View File

@ -16,12 +16,24 @@
*/
package org.apache.activemq.transport.amqp.message;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@ -39,8 +51,6 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@ -81,7 +91,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
public JMSMappingOutboundTransformer(JMSVendor vendor) {
public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) {
super(vendor);
}
@ -121,145 +131,107 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
/**
* Perform the conversion between JMS Message and Proton Message without
* re-encoding it to array. This is needed because some frameworks may elect
* to do this on their own way (Netty for instance using Nettybuffers)
* to do this on their own way.
*
* @param msg
* @return
* @throws Exception
* @param message
* The message to transform into an AMQP version for dispatch.
*
* @return an AMQP Message that represents the given JMS Message.
*
* @throws Exception if an error occurs during the conversion.
*/
public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException {
Header header = new Header();
Properties props = new Properties();
HashMap<Symbol, Object> daMap = null;
HashMap<Symbol, Object> maMap = null;
HashMap apMap = null;
Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null;
Map<String,Object> apMap = null;
Map<Object, Object> footerMap = null;
Section body = null;
HashMap footerMap = null;
if (msg instanceof BytesMessage) {
BytesMessage m = (BytesMessage) msg;
byte data[] = new byte[(int) m.getBodyLength()];
m.readBytes(data);
m.reset(); // Need to reset after readBytes or future readBytes
// calls (ex: redeliveries) will fail and return -1
body = new Data(new Binary(data));
}
if (msg instanceof TextMessage) {
body = new AmqpValue(((TextMessage) msg).getText());
}
if (msg instanceof MapMessage) {
final HashMap<String, Object> map = new HashMap<String, Object>();
final MapMessage m = (MapMessage) msg;
final Enumeration<String> names = m.getMapNames();
while (names.hasMoreElements()) {
String key = names.nextElement();
map.put(key, m.getObject(key));
}
body = new AmqpValue(map);
}
if (msg instanceof StreamMessage) {
ArrayList<Object> list = new ArrayList<Object>();
final StreamMessage m = (StreamMessage) msg;
try {
while (true) {
list.add(m.readObject());
}
} catch (MessageEOFException e) {
}
body = new AmqpSequence(list);
}
if (msg instanceof ObjectMessage) {
body = new AmqpValue(((ObjectMessage) msg).getObject());
}
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
if (msg.getJMSType() != null) {
props.setSubject(msg.getJMSType());
}
if (msg.getJMSMessageID() != null) {
ActiveMQMessage amqMsg = (ActiveMQMessage) msg;
body = convertBody(message);
MessageId msgId = amqMsg.getMessageId();
if (msgId.getTextView() != null) {
try {
props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
} catch (AmqpProtocolException e) {
props.setMessageId(msgId.getTextView().toString());
}
} else {
props.setMessageId(msgId.toString());
}
header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) message.getJMSPriority()));
if (message.getJMSType() != null) {
props.setSubject(message.getJMSType());
}
if (msg.getJMSDestination() != null) {
props.setTo(vendor.toAddress(msg.getJMSDestination()));
if (message.getJMSMessageID() != null) {
props.setMessageId(vendor.getOriginalMessageId(message));
}
if (message.getJMSDestination() != null) {
props.setTo(vendor.toAddress(message.getJMSDestination()));
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination()));
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination()));
}
if (msg.getJMSReplyTo() != null) {
props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
if (message.getJMSReplyTo() != null) {
props.setReplyTo(vendor.toAddress(message.getJMSReplyTo()));
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo()));
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo()));
}
if (msg.getJMSCorrelationID() != null) {
String correlationId = msg.getJMSCorrelationID();
if (message.getJMSCorrelationID() != null) {
String correlationId = message.getJMSCorrelationID();
try {
props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
} catch (AmqpProtocolException e) {
props.setCorrelationId(correlationId);
}
}
if (msg.getJMSExpiration() != 0) {
long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
if (message.getJMSExpiration() != 0) {
long ttl = message.getJMSExpiration() - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
}
header.setTtl(new UnsignedInteger((int) ttl));
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration()));
}
if (msg.getJMSTimestamp() != 0) {
props.setCreationTime(new Date(msg.getJMSTimestamp()));
if (message.getJMSTimestamp() != 0) {
props.setCreationTime(new Date(message.getJMSTimestamp()));
}
final Enumeration<String> keys = msg.getPropertyNames();
@SuppressWarnings("unchecked")
final Enumeration<String> keys = message.getPropertyNames();
while (keys.hasMoreElements()) {
String key = keys.nextElement();
if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
// skip..
if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) {
// skip transformer appended properties
} else if (key.equals(firstAcquirerKey)) {
header.setFirstAcquirer(msg.getBooleanProperty(key));
header.setFirstAcquirer(message.getBooleanProperty(key));
} else if (key.startsWith("JMSXDeliveryCount")) {
// The AMQP delivery-count field only includes prior failed delivery attempts,
// whereas JMSXDeliveryCount includes the first/current delivery attempt.
int amqpDeliveryCount = msg.getIntProperty(key) - 1;
int amqpDeliveryCount = message.getIntProperty(key) - 1;
if (amqpDeliveryCount > 0) {
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
}
} else if (key.startsWith("JMSXUserID")) {
String value = msg.getStringProperty(key);
String value = message.getStringProperty(key);
props.setUserId(new Binary(value.getBytes("UTF-8")));
} else if (key.startsWith("JMSXGroupID")) {
String value = msg.getStringProperty(key);
String value = message.getStringProperty(key);
props.setGroupId(value);
if (apMap == null) {
apMap = new HashMap();
apMap = new HashMap<String, Object>();
}
apMap.put(key, value);
} else if (key.startsWith("JMSXGroupSeq")) {
UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
props.setGroupSequence(value);
if (apMap == null) {
apMap = new HashMap();
apMap = new HashMap<String, Object>();
}
apMap.put(key, value);
} else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
@ -267,30 +239,30 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
daMap = new HashMap<Symbol, Object>();
}
String name = key.substring(prefixDeliveryAnnotationsKey.length());
daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
} else if (key.startsWith(prefixMessageAnnotationsKey)) {
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
String name = key.substring(prefixMessageAnnotationsKey.length());
maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
} else if (key.equals(contentTypeKey)) {
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
props.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
} else if (key.equals(contentEncodingKey)) {
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
} else if (key.equals(replyToGroupIDKey)) {
props.setReplyToGroupId(msg.getStringProperty(key));
props.setReplyToGroupId(message.getStringProperty(key));
} else if (key.startsWith(prefixFooterKey)) {
if (footerMap == null) {
footerMap = new HashMap();
footerMap = new HashMap<Object, Object>();
}
String name = key.substring(prefixFooterKey.length());
footerMap.put(name, msg.getObjectProperty(key));
footerMap.put(name, message.getObjectProperty(key));
} else {
if (apMap == null) {
apMap = new HashMap();
apMap = new HashMap<String, Object>();
}
apMap.put(key, msg.getObjectProperty(key));
apMap.put(key, message.getObjectProperty(key));
}
}
@ -314,6 +286,101 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
}
private Section convertBody(Message message) throws JMSException {
Section body = null;
short orignalEncoding = AMQP_UNKNOWN;
if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) {
try {
orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY);
} catch (Exception ex) {
}
}
if (message instanceof BytesMessage) {
Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
}
switch (orignalEncoding) {
case AMQP_NULL:
break;
case AMQP_VALUE_BINARY:
body = new AmqpValue(payload);
break;
case AMQP_DATA:
case AMQP_UNKNOWN:
default:
body = new Data(payload);
break;
}
} else if (message instanceof TextMessage) {
switch (orignalEncoding) {
case AMQP_NULL:
break;
case AMQP_DATA:
body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message));
break;
case AMQP_VALUE_STRING:
case AMQP_UNKNOWN:
default:
body = new AmqpValue(((TextMessage) message).getText());
break;
}
} else if (message instanceof MapMessage) {
body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message));
} else if (message instanceof StreamMessage) {
ArrayList<Object> list = new ArrayList<Object>();
final StreamMessage m = (StreamMessage) message;
try {
while (true) {
list.add(m.readObject());
}
} catch (MessageEOFException e) {
}
switch (orignalEncoding) {
case AMQP_SEQUENCE:
body = new AmqpSequence(list);
break;
case AMQP_VALUE_LIST:
case AMQP_UNKNOWN:
default:
body = new AmqpValue(list);
break;
}
} else if (message instanceof ObjectMessage) {
Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
}
switch (orignalEncoding) {
case AMQP_VALUE_BINARY:
body = new AmqpValue(payload);
break;
case AMQP_DATA:
case AMQP_UNKNOWN:
default:
body = new Data(payload);
break;
}
// For a non-AMQP message we tag the outbound content type as containing
// a serialized Java object so that an AMQP client has a hint as to what
// we are sending it.
if (!message.propertyExists(contentTypeKey)) {
vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
}
}
return body;
}
private static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
public interface JMSVendor {
public abstract BytesMessage createBytesMessage();
public abstract StreamMessage createStreamMessage();
public abstract Message createMessage();
public abstract TextMessage createTextMessage();
public abstract ObjectMessage createObjectMessage();
public abstract MapMessage createMapMessage();
public abstract void setJMSXUserID(Message message, String value);
public Destination createDestination(String name);
public abstract void setJMSXGroupID(Message message, String groupId);
public abstract void setJMSXGroupSequence(Message message, int value);
public abstract void setJMSXDeliveryCount(Message message, long value);
public abstract String toAddress(Destination destination);
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -20,24 +20,25 @@ import javax.jms.Message;
public abstract class OutboundTransformer {
JMSVendor vendor;
String prefixVendor;
protected final ActiveMQJMSVendor vendor;
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations= "MA_";
String prefixFooter = "FT_";
protected String prefixVendor;
String messageFormatKey;
String nativeKey;
String firstAcquirerKey;
String prefixDeliveryAnnotationsKey;
String prefixMessageAnnotationsKey;
String contentTypeKey;
String contentEncodingKey;
String replyToGroupIDKey;
String prefixFooterKey;
protected String prefixDeliveryAnnotations = "DA_";
protected String prefixMessageAnnotations= "MA_";
protected String prefixFooter = "FT_";
public OutboundTransformer(JMSVendor vendor) {
protected String messageFormatKey;
protected String nativeKey;
protected String firstAcquirerKey;
protected String prefixDeliveryAnnotationsKey;
protected String prefixMessageAnnotationsKey;
protected String contentTypeKey;
protected String contentEncodingKey;
protected String replyToGroupIDKey;
protected String prefixFooterKey;
public OutboundTransformer(ActiveMQJMSVendor vendor) {
this.vendor = vendor;
this.setPrefixVendor("JMS_AMQP_");
}
@ -56,18 +57,13 @@ public abstract class OutboundTransformer {
firstAcquirerKey = prefixVendor + "FirstAcquirer";
prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
contentTypeKey = prefixVendor +"ContentType";
contentEncodingKey = prefixVendor +"ContentEncoding";
replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
contentTypeKey = prefixVendor + "ContentType";
contentEncodingKey = prefixVendor + "ContentEncoding";
replyToGroupIDKey = prefixVendor + "ReplyToGroupID";
prefixFooterKey = prefixVendor + prefixFooter;
}
public JMSVendor getVendor() {
public ActiveMQJMSVendor getVendor() {
return vendor;
}
public void setVendor(JMSVendor vendor) {
this.vendor = vendor;
}
}

View File

@ -157,23 +157,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
InboundTransformer transformer = getTransformer();
ActiveMQMessage message = null;
while (transformer != null) {
try {
message = (ActiveMQMessage) transformer.transform(em);
break;
} catch (Exception e) {
LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
LOG.trace("Transformation error:", e);
transformer = transformer.getFallbackTransformer();
}
}
if (message == null) {
throw new IOException("Failed to transform incoming delivery, skipping.");
}
ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em);
current = null;

View File

@ -29,6 +29,7 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
import javax.jms.Destination;
@ -39,6 +40,7 @@ import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -166,7 +168,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
@Test(timeout = 60000)
public void testMessagePropertiesArePreservedAMQPToOpenWire() throws Exception {
// Raw Transformer doesn't expand message propeties.
// Raw Transformer doesn't expand message properties.
assumeFalse(transformer.equals("raw"));
boolean bool = true;
@ -284,7 +286,7 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
// Now consumer the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
assertTrue(received instanceof ObjectMessage);
assertTrue("Expected ObjectMessage but got " + received, received instanceof ObjectMessage);
ObjectMessage incoming = (ObjectMessage) received;
Object incomingObject = incoming.getObject();
@ -297,7 +299,126 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
openwire.close();
}
//----- Tests for OpenWire to Qpid JMS using ObjectMessage ---------------//
//----- Tests for OpenWire <-> Qpid JMS using ObjectMessage --------------//
@Test
public void testQpidToOpenWireObjectMessage() throws Exception {
// Raw Transformer doesn't expand message properties.
assumeFalse(!transformer.equals("jms"));
Connection openwire = createJMSConnection();
Connection amqp = createConnection();
openwire.start();
amqp.start();
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = openwireSession.createQueue(getDestinationName());
MessageProducer amqpProducer = amqpSession.createProducer(queue);
MessageConsumer openwireConsumer = openwireSession.createConsumer(queue);
// Create and send the Message
ObjectMessage outgoing = amqpSession.createObjectMessage();
outgoing.setObject(UUID.randomUUID());
amqpProducer.send(outgoing);
// Now consumer the ObjectMessage
Message received = openwireConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
assertTrue(received instanceof ObjectMessage);
ObjectMessage incoming = (ObjectMessage) received;
Object payload = incoming.getObject();
assertNotNull(payload);
assertTrue(payload instanceof UUID);
amqp.close();
openwire.close();
}
@Test
public void testOpenWireToQpidObjectMessage() throws Exception {
// Raw Transformer doesn't expand message properties.
assumeFalse(!transformer.equals("jms"));
Connection openwire = createJMSConnection();
Connection amqp = createConnection();
openwire.start();
amqp.start();
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = openwireSession.createQueue(getDestinationName());
MessageProducer openwireProducer = openwireSession.createProducer(queue);
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
// Create and send the Message
ObjectMessage outgoing = amqpSession.createObjectMessage();
outgoing.setObject(UUID.randomUUID());
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
assertTrue(received instanceof ObjectMessage);
ObjectMessage incoming = (ObjectMessage) received;
Object payload = incoming.getObject();
assertNotNull(payload);
assertTrue(payload instanceof UUID);
amqp.close();
openwire.close();
}
@Test
public void testOpenWireToQpidObjectMessageWithOpenWireCompression() throws Exception {
// Raw Transformer doesn't expand message properties.
assumeFalse(!transformer.equals("jms"));
Connection openwire = createJMSConnection();
((ActiveMQConnection) openwire).setUseCompression(true);
Connection amqp = createConnection();
openwire.start();
amqp.start();
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = openwireSession.createQueue(getDestinationName());
MessageProducer openwireProducer = openwireSession.createProducer(queue);
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
// Create and send the Message
ObjectMessage outgoing = amqpSession.createObjectMessage();
outgoing.setObject(UUID.randomUUID());
openwireProducer.send(outgoing);
// Now consumer the ObjectMessage
Message received = amqpConsumer.receive(2000);
assertNotNull(received);
LOG.info("Read new message: {}", received);
assertTrue(received instanceof ObjectMessage);
ObjectMessage incoming = (ObjectMessage) received;
Object payload = incoming.getObject();
assertNotNull(payload);
assertTrue(payload instanceof UUID);
amqp.close();
openwire.close();
}
@SuppressWarnings("unchecked")
@Test

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.junit.Test;
public class AmqpContentTypeSupportTest {
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeWithOnlyType() throws Exception {
doParseContentTypeTestImpl("type", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeEndsWithSlash() throws Exception {
doParseContentTypeTestImpl("type/", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeMissingSubtype() throws Exception {
doParseContentTypeTestImpl("type/;", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeEmptyString() throws Exception {
doParseContentTypeTestImpl("", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeNullString() throws Exception {
doParseContentTypeTestImpl(null, null);
}
@Test
public void testParseContentTypeNoParamsAfterSeparatorNonTextual() throws Exception {
// Expect null as this is not a textual type
doParseContentTypeTestImpl("type/subtype;", null);
}
@Test
public void testParseContentTypeNoParamsAfterSeparatorTextualType() throws Exception {
doParseContentTypeTestImpl("text/plain;", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeEmptyParamsAfterSeparator() throws Exception {
doParseContentTypeTestImpl("text/plain;;", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeNoParams() throws Exception {
doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithCharsetUtf8() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithCharsetAscii() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
}
@Test
public void testParseContentTypeWithMultipleParams() throws Exception {
doParseContentTypeTestImpl("text/plain; param=value; charset=us-ascii", StandardCharsets.US_ASCII);
}
@Test
public void testParseContentTypeWithCharsetQuoted() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=\"us-ascii\"", StandardCharsets.US_ASCII);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeWithCharsetQuotedEmpty() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=\"\"", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeWithCharsetQuoteNotClosed() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=\"unclosed", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeWithCharsetQuoteNotClosedEmpty() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=\"", null);
}
@Test (expected = InvalidContentTypeException.class)
public void testParseContentTypeWithNoCharsetValue() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=", null);
}
@Test
public void testParseContentTypeWithTextPlain() throws Exception {
doParseContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithTextJson() throws Exception {
doParseContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithTextHtml() throws Exception {
doParseContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithTextFoo() throws Exception {
doParseContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationJson() throws Exception {
doParseContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationJsonVariant() throws Exception {
doParseContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationJavascript() throws Exception {
doParseContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationEcmascript() throws Exception {
doParseContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationXml() throws Exception {
doParseContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationXmlVariant() throws Exception {
doParseContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationXmlDtd() throws Exception {
doParseContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doParseContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
doParseContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
doParseContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
}
@Test
public void testParseContentTypeWithApplicationOtherNotTextual() throws Exception {
// Expect null as this is not a textual type
doParseContentTypeTestImpl("application/other", null);
}
@Test
public void testParseContentTypeWithApplicationOctetStream() throws Exception {
// Expect null as this is not a textual type
doParseContentTypeTestImpl(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE, null);
}
@Test
public void testParseContentTypeWithApplicationJavaSerialized() throws Exception {
// Expect null as this is not a textual type
doParseContentTypeTestImpl(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, null);
}
private void doParseContentTypeTestImpl(String contentType, Charset expected) throws InvalidContentTypeException {
Charset charset = AmqpContentTypeSupport.parseContentTypeForTextualCharset(contentType);
if (expected == null) {
assertNull("Expected no charset, but got:" + charset, charset);
} else {
assertEquals("Charset not as expected", expected, charset);
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
public class AmqpMessageSupportTest {
//---------- getSymbol ---------------------------------------------------//
@Test
public void testGetSymbol() {
assertNotNull(AmqpMessageSupport.getSymbol("x-opt-something-or-other"));
}
//---------- getMessageAnnotation ----------------------------------------//
@Test
public void testGetMessageAnnotationWhenMessageHasAnnotationsMap() {
Map<Symbol, Object> messageAnnotationsMap = new HashMap<Symbol,Object>();
messageAnnotationsMap.put(Symbol.valueOf("x-opt-test"), Boolean.TRUE);
Message message = Proton.message();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
assertNotNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
}
@Test
public void testGetMessageAnnotationWhenMessageHasEmptyAnnotationsMap() {
Map<Symbol, Object> messageAnnotationsMap = new HashMap<Symbol,Object>();
Message message = Proton.message();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
}
@Test
public void testGetMessageAnnotationWhenMessageHasNoAnnotationsMap() {
Message message = Proton.message();
assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", message));
}
@Test
public void testGetMessageAnnotationWhenMessageIsNull() {
assertNull(AmqpMessageSupport.getMessageAnnotation("x-opt-test", null));
}
//---------- isContentType -----------------------------------------------//
@Test
public void testIsContentTypeWithNullStringValueAndNullMessageContentType() {
Message message = Proton.message();
assertTrue(AmqpMessageSupport.isContentType(null, message));
}
@Test
public void testIsContentTypeWithNonNullStringValueAndNullMessageContentType() {
Message message = Proton.message();
assertFalse(AmqpMessageSupport.isContentType("test", message));
}
@Test
public void testIsContentTypeWithNonNullStringValueAndNonNullMessageContentTypeNotEqual() {
Message message = Proton.message();
message.setContentType("fails");
assertFalse(AmqpMessageSupport.isContentType("test", message));
}
@Test
public void testIsContentTypeWithNonNullStringValueAndNonNullMessageContentTypeEqual() {
Message message = Proton.message();
message.setContentType("test");
assertTrue(AmqpMessageSupport.isContentType("test", message));
}
@Test
public void testIsContentTypeWithNullStringValueAndNonNullMessageContentType() {
Message message = Proton.message();
message.setContentType("test");
assertFalse(AmqpMessageSupport.isContentType(null, message));
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,11 +16,18 @@
*/
package org.apache.activemq.transport.amqp.message;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.Queue;
@ -29,8 +36,18 @@ import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
@ -38,26 +55,513 @@ import org.mockito.Mockito;
public class JMSMappingInboundTransformerTest {
//----- Null Body Section ------------------------------------------------//
/**
* Test that a message with no body section, but with the content type set to
* {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
TextMessage mockTextMessage = createMockTextMessage();
JMSVendor mockVendor = createMockVendor(mockTextMessage);
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
String contentString = "myTextMessageContent";
Message amqp = Message.Factory.create();
amqp.setBody(new AmqpValue(contentString));
Message message = Message.Factory.create();
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
EncodedMessage em = encodeMessage(amqp);
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that a message with no body section, and no content-type results in a BytesMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
Message message = Message.Factory.create();
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that a message with no body section, but with the content type set to
* {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
Message message = Message.Factory.create();
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
}
@Test
public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
Message message = Message.Factory.create();
message.setContentType("text/plain");
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
}
/**
* Test that a message with no body section, and with the content type set to
* an unknown value results in a plain Message when not otherwise annotated to
* indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
Message message = Message.Factory.create();
message.setContentType("unknown-content-type");
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQMessage.class, jmsMessage.getClass());
}
//----- Data Body Section ------------------------------------------------//
/**
* Test that a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#OCTET_STREAM_CONTENT_TYPE} results in a BytesMessage when not
* otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE);
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that a message with an empty data body section, and with the content type
* set to an unknown value results in a BytesMessage when not otherwise annotated
* to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
public void testCreateBytesMessageFromDataWithUnknownContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType("unknown-content-type");
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that a receiving a data body containing nothing and no content type being set
* results in a BytesMessage when not otherwise annotated to indicate the type of
* JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromDataWithEmptyBinaryAndNoContentType() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
assertNull(message.getContentType());
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that receiving a data body containing nothing, but with the content type set to
* {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromDataWithContentTypeAndEmptyBinary() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
}
@Test
public void testCreateTextMessageFromDataWithContentTypeTextPlain() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/plain", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeTextJson() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/json;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/json", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeTextHtml() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/html;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/html", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeTextFoo() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("text/foo", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationJson() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/json;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/json", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationJsonVariant() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+json", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationJavascript() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/javascript", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationEcmascript() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/ecmascript", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationXml() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationXmlVariant() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/something+xml", StandardCharsets.UTF_8);
}
@Test
public void testCreateTextMessageFromDataWithContentTypeApplicationXmlDtd() throws Exception {
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=iso-8859-1", StandardCharsets.ISO_8859_1);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=us-ascii", StandardCharsets.US_ASCII);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd;charset=utf-8", StandardCharsets.UTF_8);
doCreateTextMessageFromDataWithContentTypeTestImpl("application/xml-dtd", StandardCharsets.UTF_8);
}
private void doCreateTextMessageFromDataWithContentTypeTestImpl(String contentType, Charset expectedCharset) throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new Data(binary));
message.setContentType(contentType);
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
if (StandardCharsets.UTF_8.equals(expectedCharset)) {
assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
} else {
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
}
//----- AmqpValue transformations ----------------------------------------//
/**
* Test that an amqp-value body containing a string results in a TextMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateTextMessageFromAmqpValueWithString() throws Exception {
Message message = Proton.message();
message.setBody(new AmqpValue("content"));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a null results in an TextMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateTextMessageFromAmqpValueWithNull() throws Exception {
Message message = Proton.message();
message.setBody(new AmqpValue(null));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
}
/**
* Test that a message with an AmqpValue section containing a Binary, but with the content type
* set to {@value AmqpMessageSupport#SERIALIZED_JAVA_OBJECT_CONTENT_TYPE} results in an ObjectMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception {
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
Message message = Message.Factory.create();
message.setBody(new AmqpValue(new Binary(new byte[0])));
message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
EncodedMessage em = encodeMessage(message);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQObjectMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a map results in an MapMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpMapMessageFromAmqpValueWithMap() throws Exception {
Message message = Proton.message();
Map<String, String> map = new HashMap<String,String>();
message.setBody(new AmqpValue(map));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQMapMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a list results in an StreamMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpValueWithList() throws Exception {
Message message = Proton.message();
List<String> list = new ArrayList<String>();
message.setBody(new AmqpValue(list));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQStreamMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-sequence body containing a list results in an StreamMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpStreamMessageFromAmqpSequence() throws Exception {
Message message = Proton.message();
List<String> list = new ArrayList<String>();
message.setBody(new AmqpSequence(list));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQStreamMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a binary value results in BytesMessage
* when not otherwise annotated to indicate the type of JMS message it is.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateAmqpBytesMessageFromAmqpValueWithBinary() throws Exception {
Message message = Proton.message();
Binary binary = new Binary(new byte[0]);
message.setBody(new AmqpValue(binary));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
/**
* Test that an amqp-value body containing a value which can't be categorized results in
* an exception from the transformer and then try the transformer's own fallback transformer
* to result in an BytesMessage.
*
* @throws Exception if an error occurs during the test.
*/
@Test
public void testCreateBytesMessageFromAmqpValueWithUncategorisedContent() throws Exception {
Message message = Proton.message();
message.setBody(new AmqpValue(UUID.randomUUID()));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
Mockito.verify(mockTextMessage).setText(contentString);
assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage);
assertNotNull("Message should not be null", jmsMessage);
assertEquals("Unexpected message class type", ActiveMQBytesMessage.class, jmsMessage.getClass());
}
// ======= JMSDestination Handling =========
@Test
public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
String contentString = "myTextMessageContent";
Message message = Message.Factory.create();
message.setBody(new AmqpValue(contentString));
EncodedMessage em = encodeMessage(message);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor);
javax.jms.Message jmsMessage = transformer.transform(em);
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
assertEquals("Unexpected message class type", ActiveMQTextMessage.class, jmsMessage.getClass());
TextMessage textMessage = (TextMessage) jmsMessage;
assertNotNull(textMessage.getText());
assertEquals(contentString, textMessage.getText());
}
//----- Destination Conversions ------------------------------------------//
@Test
public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception {
@ -85,8 +589,8 @@ public class JMSMappingInboundTransformerTest {
}
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
TextMessage mockTextMessage = createMockTextMessage();
JMSVendor mockVendor = createMockVendor(mockTextMessage);
ActiveMQTextMessage mockTextMessage = createMockTextMessage();
ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
String toAddress = "toAddress";
@ -111,7 +615,7 @@ public class JMSMappingInboundTransformerTest {
// Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
}
// ======= JMSReplyTo Handling =========
//----- ReplyTo Conversions ----------------------------------------------//
@Test
public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception {
@ -139,8 +643,8 @@ public class JMSMappingInboundTransformerTest {
}
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass) throws Exception {
TextMessage mockTextMessage = createMockTextMessage();
JMSVendor mockVendor = createMockVendor(mockTextMessage);
ActiveMQTextMessage mockTextMessage = createMockTextMessage();
ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage);
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
String replyToAddress = "replyToAddress";
@ -165,21 +669,24 @@ public class JMSMappingInboundTransformerTest {
// Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
}
// ======= Utility Methods =========
//----- Utility Methods --------------------------------------------------//
private TextMessage createMockTextMessage() {
TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
return mockTextMessage;
private ActiveMQTextMessage createMockTextMessage() {
return Mockito.mock(ActiveMQTextMessage.class);
}
private JMSVendor createMockVendor(TextMessage mockTextMessage) {
JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) {
ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class);
Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage);
return mockVendor;
}
private ActiveMQJMSVendor createVendor() {
return ActiveMQJMSVendor.INSTANCE;
}
private EncodedMessage encodeMessage(Message message) {
byte[] encodeBuffer = new byte[1024 * 8];
int encodedSize;

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -16,6 +16,12 @@
*/
package org.apache.activemq.transport.amqp.message;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.QUEUE_TYPE;
@ -23,22 +29,40 @@ import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTrans
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TEMP_TOPIC_TYPE;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TOPIC_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
@ -46,23 +70,581 @@ import org.mockito.Mockito;
public class JMSMappingOutboundTransformerTest {
//----- no-body Message type tests ---------------------------------------//
@Test
public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception {
public void testConvertMessageToAmqpMessageWithNoBody() throws Exception {
ActiveMQMessage outbound = createMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNull(amqp.getBody());
}
@Test
public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception {
ActiveMQTextMessage outbound = createTextMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNull(amqp.getBody());
}
//----- BytesMessage type tests ---------------------------------------//
@Test
public void testConvertEmptyBytesMessageToAmqpMessageWithDataBody() throws Exception {
ActiveMQBytesMessage outbound = createBytesMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
}
@Test
public void testConvertUncompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
ActiveMQBytesMessage outbound = createBytesMessage();
outbound.writeBytes(expectedPayload);
outbound.storeContent();
outbound.onSend();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
Binary amqpData = ((Data) amqp.getBody()).getValue();
Binary inputData = new Binary(expectedPayload);
assertTrue(inputData.equals(amqpData));
}
@Test
public void testConvertCompressedBytesMessageToAmqpMessageWithDataBody() throws Exception {
byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
ActiveMQBytesMessage outbound = createBytesMessage(true);
outbound.writeBytes(expectedPayload);
outbound.storeContent();
outbound.onSend();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
assertEquals(4, ((Data) amqp.getBody()).getValue().getLength());
Binary amqpData = ((Data) amqp.getBody()).getValue();
Binary inputData = new Binary(expectedPayload);
assertTrue(inputData.equals(amqpData));
}
@Test
public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQBytesMessage outbound = createBytesMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
}
@Test
public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
ActiveMQBytesMessage outbound = createBytesMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.writeBytes(expectedPayload);
outbound.storeContent();
outbound.onSend();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
Binary inputData = new Binary(expectedPayload);
assertTrue(inputData.equals(amqpData));
}
@Test
public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception {
byte[] expectedPayload = new byte[] { 8, 16, 24, 32 };
ActiveMQBytesMessage outbound = createBytesMessage(true);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.writeBytes(expectedPayload);
outbound.storeContent();
outbound.onSend();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Binary);
assertEquals(4, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
Binary amqpData = (Binary) ((AmqpValue) amqp.getBody()).getValue();
Binary inputData = new Binary(expectedPayload);
assertTrue(inputData.equals(amqpData));
}
//----- MapMessage type tests --------------------------------------------//
@Test
public void testConvertMapMessageToAmqpMessageWithNoBody() throws Exception {
ActiveMQMapMessage outbound = createMapMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
}
@Test
public void testConvertMapMessageToAmqpMessage() throws Exception {
ActiveMQMapMessage outbound = createMapMessage();
outbound.setString("property-1", "string");
outbound.setInt("property-2", 1);
outbound.setBoolean("property-3", true);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertEquals(3, amqpMap.size());
assertTrue("string".equals(amqpMap.get("property-1")));
}
@Test
public void testConvertCompressedMapMessageToAmqpMessage() throws Exception {
ActiveMQMapMessage outbound = createMapMessage(true);
outbound.setString("property-1", "string");
outbound.setInt("property-2", 1);
outbound.setBoolean("property-3", true);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertEquals(3, amqpMap.size());
assertTrue("string".equals(amqpMap.get("property-1")));
}
//----- StreamMessage type tests -----------------------------------------//
@Test
public void testConvertStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQStreamMessage outbound = createStreamMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
}
@Test
public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
ActiveMQStreamMessage outbound = createStreamMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
}
@Test
public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQStreamMessage outbound = createStreamMessage(true);
outbound.writeBoolean(false);
outbound.writeString("test");
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof List);
@SuppressWarnings("unchecked")
List<Object> amqpList = (List<Object>) ((AmqpValue) amqp.getBody()).getValue();
assertEquals(2, amqpList.size());
}
@Test
public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception {
ActiveMQStreamMessage outbound = createStreamMessage(true);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE);
outbound.writeBoolean(false);
outbound.writeString("test");
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpSequence);
assertTrue(((AmqpSequence) amqp.getBody()).getValue() instanceof List);
@SuppressWarnings("unchecked")
List<Object> amqpList = ((AmqpSequence) amqp.getBody()).getValue();
assertEquals(2, amqpList.size());
}
//----- ObjectMessage type tests -----------------------------------------//
@Test
public void testConvertEmptyObjectMessageToAmqpMessageWithDataBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
}
@Test
public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertEquals(0, ((Data) amqp.getBody()).getValue().getLength());
}
@Test
public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage();
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
assertEquals(0, ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
}
@Test
public void testConvertObjectMessageToAmqpMessageWithDataBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID());
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertFalse(0 == ((Data) amqp.getBody()).getValue().getLength());
Object value = deserialize(((Data) amqp.getBody()).getValue().getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
@Test
public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception {
ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue)amqp.getBody()).getValue() instanceof Binary);
assertFalse(0 == ((Binary) ((AmqpValue) amqp.getBody()).getValue()).getLength());
Object value = deserialize(((Binary) ((AmqpValue) amqp.getBody()).getValue()).getArray());
assertNotNull(value);
assertTrue(value instanceof UUID);
}
//----- TextMessage type tests -------------------------------------------//
@Test
public void testConvertTextMessageToAmqpMessageWithNoBody() throws Exception {
ActiveMQTextMessage outbound = createTextMessage();
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertNull(((AmqpValue) amqp.getBody()).getValue());
}
@Test
public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception {
String contentString = "myTextMessageContent";
TextMessage mockTextMessage = createMockTextMessage();
Mockito.when(mockTextMessage.getText()).thenReturn(contentString);
JMSVendor mockVendor = createMockVendor();
ActiveMQTextMessage outbound = createTextMessage(contentString);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(mockTextMessage);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
Binary data = ((Data) amqp.getBody()).getValue();
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
@Test
public void testConvertTextMessageCreatesAmqpValueStringBody() throws Exception {
String contentString = "myTextMessageContent";
ActiveMQTextMessage outbound = createTextMessage(contentString);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
}
// ======= JMSDestination Handling =========
@Test
public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception {
String contentString = "myTextMessageContent";
ActiveMQTextMessage outbound = createTextMessage(contentString, true);
outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA);
outbound.onSend();
outbound.storeContent();
ActiveMQJMSVendor vendor = createVendor();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor);
Message amqp = transformer.convert(outbound);
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof Data);
assertTrue(((Data) amqp.getBody()).getValue() instanceof Binary);
Binary data = ((Data) amqp.getBody()).getValue();
String contents = new String(data.getArray(), data.getArrayOffset(), data.getLength(), StandardCharsets.UTF_8);
assertEquals(contentString, contents);
}
//----- Test JMSDestination Handling -------------------------------------//
@Test
public void testConvertMessageWithJMSDestinationNull() throws Exception {
@ -98,10 +680,10 @@ public class JMSMappingOutboundTransformerTest {
}
private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
TextMessage mockTextMessage = createMockTextMessage();
ActiveMQTextMessage mockTextMessage = createMockTextMessage();
Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
JMSVendor mockVendor = createMockVendor();
ActiveMQJMSVendor mockVendor = createMockVendor();
String toAddress = "someToAddress";
if (jmsDestination != null) {
Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress);
@ -125,7 +707,7 @@ public class JMSMappingOutboundTransformerTest {
}
}
// ======= JMSReplyTo Handling =========
//----- Test JMSReplyTo Handling -----------------------------------------//
@Test
public void testConvertMessageWithJMSReplyToNull() throws Exception {
@ -161,10 +743,10 @@ public class JMSMappingOutboundTransformerTest {
}
private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
TextMessage mockTextMessage = createMockTextMessage();
ActiveMQTextMessage mockTextMessage = createMockTextMessage();
Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
JMSVendor mockVendor = createMockVendor();
ActiveMQJMSVendor mockVendor = createMockVendor();
String replyToAddress = "someReplyToAddress";
if (jmsReplyTo != null) {
Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress);
@ -188,18 +770,139 @@ public class JMSMappingOutboundTransformerTest {
}
}
// ======= Utility Methods =========
//----- Utility Methods used for this Test -------------------------------//
private TextMessage createMockTextMessage() throws Exception {
TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
private ActiveMQTextMessage createMockTextMessage() throws Exception {
ActiveMQTextMessage mockTextMessage = Mockito.mock(ActiveMQTextMessage.class);
Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
return mockTextMessage;
}
private JMSVendor createMockVendor() {
JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
private ActiveMQJMSVendor createVendor() {
return ActiveMQJMSVendor.INSTANCE;
}
return mockVendor;
private ActiveMQJMSVendor createMockVendor() {
return Mockito.mock(ActiveMQJMSVendor.class);
}
private ActiveMQMessage createMessage() {
return new ActiveMQMessage();
}
private ActiveMQBytesMessage createBytesMessage() {
return createBytesMessage(false);
}
private ActiveMQBytesMessage createBytesMessage(boolean compression) {
ActiveMQBytesMessage message = new ActiveMQBytesMessage();
if (compression) {
ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
Mockito.when(connection.isUseCompression()).thenReturn(true);
message.setConnection(connection);
}
return message;
}
private ActiveMQMapMessage createMapMessage() {
return createMapMessage(false);
}
private ActiveMQMapMessage createMapMessage(boolean compression) {
ActiveMQMapMessage message = new ActiveMQMapMessage();
if (compression) {
ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
Mockito.when(connection.isUseCompression()).thenReturn(true);
message.setConnection(connection);
}
return message;
}
private ActiveMQStreamMessage createStreamMessage() {
return createStreamMessage(false);
}
private ActiveMQStreamMessage createStreamMessage(boolean compression) {
ActiveMQStreamMessage message = new ActiveMQStreamMessage();
if (compression) {
ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
Mockito.when(connection.isUseCompression()).thenReturn(true);
message.setConnection(connection);
}
return message;
}
private ActiveMQObjectMessage createObjectMessage() {
return createObjectMessage(null);
}
private ActiveMQObjectMessage createObjectMessage(Serializable payload) {
return createObjectMessage(payload, false);
}
private ActiveMQObjectMessage createObjectMessage(Serializable payload, boolean compression) {
ActiveMQObjectMessage result = new ActiveMQObjectMessage();
if (compression) {
ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
Mockito.when(connection.isUseCompression()).thenReturn(true);
result.setConnection(connection);
}
try {
result.setObject(payload);
} catch (JMSException ex) {
throw new AssertionError("Should not fail to setObject in this test");
}
result = Mockito.spy(result);
try {
Mockito.doThrow(new AssertionError("invalid setObject")).when(result).setObject(Mockito.any(Serializable.class));
Mockito.doThrow(new AssertionError("invalid getObject")).when(result).getObject();
} catch (JMSException e) {
}
return result;
}
private ActiveMQTextMessage createTextMessage() {
return createTextMessage(null);
}
private ActiveMQTextMessage createTextMessage(String text) {
return createTextMessage(text, false);
}
private ActiveMQTextMessage createTextMessage(String text, boolean compression) {
ActiveMQTextMessage result = new ActiveMQTextMessage();
if (compression) {
ActiveMQConnection connection = Mockito.mock(ActiveMQConnection.class);
Mockito.when(connection.isUseCompression()).thenReturn(true);
result.setConnection(connection);
}
try {
result.setText(text);
} catch (JMSException e) {
}
return result;
}
private Object deserialize(byte[] payload) throws Exception {
try (ByteArrayInputStream bis = new ByteArrayInputStream(payload);
ObjectInputStream ois = new ObjectInputStream(bis);) {
return ois.readObject();
}
}
}

View File

@ -0,0 +1,312 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.amqp.JMSInteroperabilityTest;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.ProtonJMessage;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Some simple performance tests for the Message Transformers.
*/
@Ignore("Turn on to profile.")
@RunWith(Parameterized.class)
public class JMSTransformationSpeedComparisonTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
private final String transformer;
private final int WARM_CYCLES = 10;
private final int PROFILE_CYCLES = 1000000;
public JMSTransformationSpeedComparisonTest(String transformer) {
this.transformer = transformer;
}
@Parameters(name="Transformer->{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"jms"},
{"native"},
{"raw"},
});
}
private InboundTransformer getInboundTransformer() {
switch (transformer) {
case "raw":
return new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
case "native":
return new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
default:
return new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
}
}
private OutboundTransformer getOutboundTransformer() {
switch (transformer) {
case "raw":
case "native":
return new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
default:
return new JMSMappingOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
}
}
@Test
public void testBodyOnlyMessage() throws Exception {
Message message = Proton.message();
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
EncodedMessage encoded = encode(message);
InboundTransformer inboundTransformer = getInboundTransformer();
OutboundTransformer outboundTransformer = getOutboundTransformer();
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
totalDuration += System.nanoTime() - startTime;
LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
@Test
public void testMessageWithNoPropertiesOrAnnotations() throws Exception {
Message message = Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
message.setCreationTime(System.currentTimeMillis());
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
EncodedMessage encoded = encode(message);
InboundTransformer inboundTransformer = getInboundTransformer();
OutboundTransformer outboundTransformer = getOutboundTransformer();
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
totalDuration += System.nanoTime() - startTime;
LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
@Test
public void testTypicalQpidJMSMessage() throws Exception {
Map<String, Object> applicationProperties = new HashMap<String, Object>();
Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
applicationProperties.put("property-1", "string");
applicationProperties.put("property-2", 512);
applicationProperties.put("property-3", true);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
Message message = Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
message.setApplicationProperties(new ApplicationProperties(applicationProperties));
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setCreationTime(System.currentTimeMillis());
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
EncodedMessage encoded = encode(message);
InboundTransformer inboundTransformer = getInboundTransformer();
OutboundTransformer outboundTransformer = getOutboundTransformer();
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded);
intermediate.onSend();
outboundTransformer.transform(intermediate);
}
totalDuration += System.nanoTime() - startTime;
LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
@Test
public void testTypicalQpidJMSMessageInBoundOnly() throws Exception {
Map<String, Object> applicationProperties = new HashMap<String, Object>();
Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
applicationProperties.put("property-1", "string");
applicationProperties.put("property-2", 512);
applicationProperties.put("property-3", true);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
Message message = Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
message.setApplicationProperties(new ApplicationProperties(applicationProperties));
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setCreationTime(System.currentTimeMillis());
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
EncodedMessage encoded = encode(message);
InboundTransformer inboundTransformer = getInboundTransformer();
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
inboundTransformer.transform(encoded);
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
inboundTransformer.transform(encoded);
}
totalDuration += System.nanoTime() - startTime;
LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
@Test
public void testTypicalQpidJMSMessageOutBoundOnly() throws Exception {
Map<String, Object> applicationProperties = new HashMap<String, Object>();
Map<Symbol, Object> messageAnnotations = new HashMap<Symbol, Object>();
applicationProperties.put("property-1", "string");
applicationProperties.put("property-2", 512);
applicationProperties.put("property-3", true);
messageAnnotations.put(Symbol.valueOf("x-opt-jms-msg-type"), 0);
Message message = Proton.message();
message.setAddress("queue://test-queue");
message.setDeliveryCount(1);
message.setApplicationProperties(new ApplicationProperties(applicationProperties));
message.setMessageAnnotations(new MessageAnnotations(messageAnnotations));
message.setCreationTime(System.currentTimeMillis());
message.setContentType("text/plain");
message.setBody(new AmqpValue("String payload for AMQP message conversion performance testing."));
EncodedMessage encoded = encode(message);
InboundTransformer inboundTransformer = getInboundTransformer();
OutboundTransformer outboundTransformer = getOutboundTransformer();
ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded);
outbound.onSend();
// Warm up
for (int i = 0; i < WARM_CYCLES; ++i) {
outboundTransformer.transform(outbound);
}
long totalDuration = 0;
long startTime = System.nanoTime();
for (int i = 0; i < PROFILE_CYCLES; ++i) {
outboundTransformer.transform(outbound);
}
totalDuration += System.nanoTime() - startTime;
LOG.info("[{}] Total time for {} cycles of transforms = {} ms",
transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration));
}
private EncodedMessage encode(Message message) {
ProtonJMessage amqp = (ProtonJMessage) message;
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
if (overflow.position() > 0) {
buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
return new EncodedMessage(1, buffer.array(), 0, c);
}
}