diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java index 65cd657323..b5429e63aa 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java @@ -16,14 +16,10 @@ */ package org.apache.activemq.transport.amqp.message; -import javax.jms.Message; +import org.apache.activemq.command.ActiveMQMessage; public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { - public AMQPNativeInboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); - } - @Override public String getTransformerName() { return TRANSFORMER_NATIVE; @@ -31,14 +27,14 @@ public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { @Override public InboundTransformer getFallbackTransformer() { - return new AMQPRawInboundTransformer(getVendor()); + return new AMQPRawInboundTransformer(); } @Override - protected Message doTransform(EncodedMessage amqpMessage) throws Exception { + protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception { org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); - Message result = super.doTransform(amqpMessage); + ActiveMQMessage result = super.doTransform(amqpMessage); populateMessage(result, amqp); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java index 620b79b891..cbc34619b6 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java @@ -16,93 +16,73 @@ */ package org.apache.activemq.transport.amqp.message; -import java.nio.ByteBuffer; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody; -import javax.jms.BytesMessage; import javax.jms.JMSException; -import javax.jms.Message; import javax.jms.MessageFormatException; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.ProtonJMessage; -public class AMQPNativeOutboundTransformer extends OutboundTransformer { - - public AMQPNativeOutboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); - } +public class AMQPNativeOutboundTransformer implements OutboundTransformer { @Override - public EncodedMessage transform(Message msg) throws Exception { - if (msg == null || !(msg instanceof BytesMessage)) { + public EncodedMessage transform(ActiveMQMessage message) throws Exception { + if (message == null || !(message instanceof ActiveMQBytesMessage)) { return null; } - try { - if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) { - return null; - } - } catch (MessageFormatException e) { - return null; - } - - return transform(this, (BytesMessage) msg); + return transform(this, (ActiveMQBytesMessage) message); } - static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException { long messageFormat; try { - messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT"); + messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT); } catch (MessageFormatException e) { return null; } - byte data[] = new byte[(int) msg.getBodyLength()]; - int dataSize = data.length; - msg.readBytes(data); - msg.reset(); - try { - int count = msg.getIntProperty("JMSXDeliveryCount"); - if (count > 1) { + Binary encodedMessage = getBinaryFromMessageBody(message); + byte encodedData[] = encodedMessage.getArray(); + int encodedSize = encodedMessage.getLength(); - // decode... - ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); - int offset = 0; - int len = data.length; - while (len > 0) { - final int decoded = amqp.decode(data, offset, len); - assert decoded > 0 : "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } + int count = message.getRedeliveryCounter(); + if (count >= 1) { - // Update the DeliveryCount header... - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. - if (amqp.getHeader() == null) { - amqp.setHeader(new Header()); - } - - amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); - - // Re-encode... - ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); - final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); - int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); - if (overflow.position() > 0) { - buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); - c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); - } - data = buffer.array(); - dataSize = c; + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = encodedSize; + while (len > 0) { + final int decoded = amqp.decode(encodedData, offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; } - } catch (JMSException e) { + + // Update the DeliveryCount header... + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. + if (amqp.getHeader() == null) { + amqp.setHeader(new Header()); + } + + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count)); + + // Re-encode... + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + int written = amqp.encode(buffer); + + encodedData = buffer.getArray(); + encodedSize = written; } - return new EncodedMessage(messageFormat, data, 0, dataSize); + return new EncodedMessage(messageFormat, encodedData, 0, encodedSize); } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java index c534709eb7..b4d3ad66e8 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java @@ -16,15 +16,16 @@ */ package org.apache.activemq.transport.amqp.message; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE; + import javax.jms.Message; -public class AMQPRawInboundTransformer extends InboundTransformer { +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; - public AMQPRawInboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); - } +public class AMQPRawInboundTransformer extends InboundTransformer { @Override public String getTransformerName() { @@ -37,22 +38,23 @@ public class AMQPRawInboundTransformer extends InboundTransformer { } @Override - protected Message doTransform(EncodedMessage amqpMessage) throws Exception { - BytesMessage result = vendor.createBytesMessage(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception { + ActiveMQBytesMessage result = new ActiveMQBytesMessage(); + result.setContent(new ByteSequence(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength())); // We cannot decode the message headers to check so err on the side of caution // and mark all messages as persistent. - result.setJMSDeliveryMode(DeliveryMode.PERSISTENT); - result.setJMSPriority(defaultPriority); + result.setPersistent(true); + result.setPriority((byte) Message.DEFAULT_PRIORITY); final long now = System.currentTimeMillis(); - result.setJMSTimestamp(now); - if (defaultTtl > 0) { - result.setJMSExpiration(now + defaultTtl); + result.setTimestamp(now); + + if (amqpMessage.getMessageFormat() != 0) { + result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); } - result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); - result.setBooleanProperty(prefixVendor + "NATIVE", true); + result.setBooleanProperty(JMS_AMQP_NATIVE, true); return result; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java deleted file mode 100644 index 9b5a4ab015..0000000000 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.message; - -import java.io.DataInputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.zip.InflaterInputStream; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageNotWriteableException; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.TextMessage; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMapMessage; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.apache.activemq.command.ActiveMQStreamMessage; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.transport.amqp.AmqpProtocolException; -import org.apache.activemq.util.ByteArrayInputStream; -import org.apache.activemq.util.ByteArrayOutputStream; -import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.util.JMSExceptionSupport; -import org.apache.qpid.proton.amqp.Binary; - -public class ActiveMQJMSVendor { - - final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor(); - - private ActiveMQJMSVendor() { - } - - /** - * @return a new vendor specific Message instance. - */ - public Message createMessage() { - return new ActiveMQMessage(); - } - - /** - * @return a new vendor specific BytesMessage instance. - */ - public BytesMessage createBytesMessage() { - return new ActiveMQBytesMessage(); - } - - /** - * @return a new vendor specific BytesMessage instance with the given payload. - */ - public BytesMessage createBytesMessage(byte[] content, int offset, int length) { - ActiveMQBytesMessage message = new ActiveMQBytesMessage(); - message.setContent(new ByteSequence(content, offset, length)); - return message; - } - - /** - * @return a new vendor specific StreamMessage instance. - */ - public StreamMessage createStreamMessage() { - return new ActiveMQStreamMessage(); - } - - /** - * @return a new vendor specific TextMessage instance. - */ - public TextMessage createTextMessage() { - return new ActiveMQTextMessage(); - } - - /** - * @return a new vendor specific TextMessage instance with the given string in the body. - */ - public TextMessage createTextMessage(String text) { - ActiveMQTextMessage message = new ActiveMQTextMessage(); - try { - message.setText(text); - } catch (MessageNotWriteableException ex) {} - - return message; - } - - /** - * @return a new vendor specific ObjectMessage instance. - */ - public ObjectMessage createObjectMessage() { - return new ActiveMQObjectMessage(); - } - - /** - * @return a new vendor specific ObjectMessage instance with the serialized form given. - */ - public ObjectMessage createObjectMessage(byte[] content, int offset, int length) { - ActiveMQObjectMessage message = new ActiveMQObjectMessage(); - message.setContent(new ByteSequence(content, offset, length)); - return message; - } - - /** - * @return a new vendor specific MapMessage instance. - */ - public MapMessage createMapMessage() { - return new ActiveMQMapMessage(); - } - - /** - * @return a new vendor specific MapMessage instance with the given map as its content. - */ - public MapMessage createMapMessage(Map content) throws JMSException { - ActiveMQMapMessage message = new ActiveMQMapMessage(); - final Set> set = content.entrySet(); - for (Map.Entry entry : set) { - message.setObject(entry.getKey(), entry.getValue()); - } - return message; - } - - /** - * Creates a new JMS Destination instance from the given name. - * - * @param name - * the name to use to construct the new Destination - * - * @return a new JMS Destination object derived from the given name. - */ - public Destination createDestination(String name) { - return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE); - } - - /** - * Set the given value as the JMSXUserID on the message instance. - * - * @param message - * the message to be updated. - * @param value - * the value to apply to the message. - */ - public void setJMSXUserID(Message msg, String value) { - ((ActiveMQMessage) msg).setUserID(value); - } - - /** - * Set the given value as the JMSXGroupID on the message instance. - * - * @param message - * the message to be updated. - * @param value - * the value to apply to the message. - */ - public void setJMSXGroupID(Message msg, String value) { - ((ActiveMQMessage) msg).setGroupID(value); - } - - /** - * Set the given value as the JMSXGroupSequence on the message instance. - * - * @param message - * the message to be updated. - * @param value - * the value to apply to the message. - */ - public void setJMSXGroupSequence(Message msg, int value) { - ((ActiveMQMessage) msg).setGroupSequence(value); - } - - /** - * Set the given value as the JMSXDeliveryCount on the message instance. - * - * @param message - * the message to be updated. - * @param value - * the value to apply to the message. - */ - public void setJMSXDeliveryCount(Message msg, long value) { - ((ActiveMQMessage) msg).setRedeliveryCounter((int) value); - } - - /** - * Convert the given JMS Destination into the appropriate AMQP address string - * for assignment to the 'to' or 'replyTo' field of an AMQP message. - * - * @param destination - * the JMS Destination instance to be converted. - * - * @return the converted string address to assign to the message. - */ - public String toAddress(Destination dest) { - return ((ActiveMQDestination) dest).getQualifiedName(); - } - - /** - * Given an Message instance return the original Message ID that was assigned the - * Message when it was first processed by the broker. For an AMQP message this - * should be the original value of the message's MessageId field with the correct - * type preserved. - * - * @param message - * the message which is being accessed. - * - * @return the original MessageId assigned to this Message instance. - */ - public Object getOriginalMessageId(Message message) { - Object result; - MessageId msgId = ((ActiveMQMessage)message).getMessageId(); - if (msgId.getTextView() != null) { - try { - result = AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()); - } catch (AmqpProtocolException e) { - result = msgId.getTextView().toString(); - } - } else { - result = msgId.toString(); - } - - return result; - } - - /** - * Return the encoded form of the BytesMessage as an AMQP Binary instance. - * - * @param message - * the Message whose binary encoded body is needed. - * - * @return a Binary instance containing the encoded message body. - * - * @throws JMSException if an error occurs while fetching the binary payload. - */ - public Binary getBinaryFromMessageBody(BytesMessage message) throws JMSException { - ActiveMQBytesMessage bytesMessage = (ActiveMQBytesMessage) message; - Binary result = null; - - if (bytesMessage.getContent() != null) { - ByteSequence contents = bytesMessage.getContent(); - - if (bytesMessage.isCompressed()) { - int length = (int) bytesMessage.getBodyLength(); - byte[] uncompressed = new byte[length]; - bytesMessage.readBytes(uncompressed); - - result = new Binary(uncompressed); - } else { - return new Binary(contents.getData(), contents.getOffset(), contents.getLength()); - } - } - - return result; - } - - /** - * Return the encoded form of the BytesMessage as an AMQP Binary instance. - * - * @param message - * the Message whose binary encoded body is needed. - * - * @return a Binary instance containing the encoded message body. - * - * @throws JMSException if an error occurs while fetching the binary payload. - */ - public Binary getBinaryFromMessageBody(ObjectMessage message) throws JMSException { - ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) message; - Binary result = null; - - if (objectMessage.getContent() != null) { - ByteSequence contents = objectMessage.getContent(); - - if (objectMessage.isCompressed()) { - try (ByteArrayOutputStream os = new ByteArrayOutputStream(); - ByteArrayInputStream is = new ByteArrayInputStream(contents); - InflaterInputStream iis = new InflaterInputStream(is);) { - - byte value; - while ((value = (byte) iis.read()) != -1) { - os.write(value); - } - - ByteSequence expanded = os.toByteSequence(); - result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength()); - } catch (Exception cause) { - throw JMSExceptionSupport.create(cause); - } - } else { - return new Binary(contents.getData(), contents.getOffset(), contents.getLength()); - } - } - - return result; - } - - /** - * Return the encoded form of the Message as an AMQP Binary instance. - * - * @param message - * the Message whose binary encoded body is needed. - * - * @return a Binary instance containing the encoded message body. - * - * @throws JMSException if an error occurs while fetching the binary payload. - */ - public Binary getBinaryFromMessageBody(TextMessage message) throws JMSException { - ActiveMQTextMessage textMessage = (ActiveMQTextMessage) message; - Binary result = null; - - if (textMessage.getContent() != null) { - ByteSequence contents = textMessage.getContent(); - - if (textMessage.isCompressed()) { - try (ByteArrayInputStream is = new ByteArrayInputStream(contents); - InflaterInputStream iis = new InflaterInputStream(is); - DataInputStream dis = new DataInputStream(iis);) { - - int size = dis.readInt(); - byte[] uncompressed = new byte[size]; - dis.readFully(uncompressed); - - result = new Binary(uncompressed); - } catch (Exception cause) { - throw JMSExceptionSupport.create(cause); - } - } else { - // Message includes a size prefix of four bytes for the OpenWire marshaler - result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4); - } - } else if (textMessage.getText() != null) { - result = new Binary(textMessage.getText().getBytes(StandardCharsets.UTF_8)); - } - - return result; - } - - /** - * Return the underlying Map from the JMS MapMessage instance. - * - * @param message - * the MapMessage whose underlying Map is requested. - * - * @return the underlying Map used to store the value in the given MapMessage. - * - * @throws JMSException if an error occurs in constructing or fetching the Map. - */ - public Map getMapFromMessageBody(MapMessage message) throws JMSException { - final HashMap map = new HashMap(); - final ActiveMQMapMessage mapMessage = (ActiveMQMapMessage) message; - - final Map contentMap = mapMessage.getContentMap(); - if (contentMap != null) { - map.putAll(contentMap); - } - - return contentMap; - } - - /** - * Sets the given Message Property on the given message overriding any read-only - * state on the Message long enough for the property to be added. - * - * @param message - * the message to set the property on. - * @param key - * the String key for the new Message property - * @param value - * the Object to assign to the new Message property. - * - * @throws JMSException if an error occurs while setting the property. - */ - public void setMessageProperty(Message message, String key, Object value) throws JMSException { - final ActiveMQMessage amqMessage = (ActiveMQMessage) message; - - boolean oldValue = amqMessage.isReadOnlyProperties(); - - amqMessage.setReadOnlyProperties(false); - amqMessage.setObjectProperty(key, value); - amqMessage.setReadOnlyProperties(oldValue); - } -} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java index 3e7a60eb77..4f468ba2a1 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpMessageSupport.java @@ -16,13 +16,26 @@ */ package org.apache.activemq.transport.amqp.message; -import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; +import java.util.zip.InflaterInputStream; +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.util.ByteArrayInputStream; +import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.activemq.util.ByteSequence; +import org.apache.activemq.util.JMSExceptionSupport; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Data; @@ -34,12 +47,44 @@ import org.apache.qpid.proton.message.Message; */ public final class AmqpMessageSupport { + // Message Properties used to map AMQP to JMS and back + + public static final String JMS_AMQP_PREFIX = "JMS_AMQP_"; + public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length(); + + public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT"; + public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING"; + public static final String NATIVE = "NATIVE"; + public static final String HEADER = "HEADER"; + public static final String PROPERTIES = "PROPERTIES"; + + public static final String FIRST_ACQUIRER = "FirstAcquirer"; + public static final String CONTENT_TYPE = "ContentType"; + public static final String CONTENT_ENCODING = "ContentEncoding"; + public static final String REPLYTO_GROUP_ID = "ReplyToGroupID"; + + public static final String DELIVERY_ANNOTATION_PREFIX = "DA_"; + public static final String MESSAGE_ANNOTATION_PREFIX = "MA_"; + public static final String FOOTER_PREFIX = "FT_"; + + public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER; + public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES; + public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING; + public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT; + public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE; + public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER; + public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE; + public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING; + public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID; + public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX; + public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX; + public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX; + + // Message body type definitions public static final Binary EMPTY_BINARY = new Binary(new byte[0]); public static final Data EMPTY_BODY = new Data(EMPTY_BINARY); public static final Data NULL_OBJECT_BODY; - public static final String AMQP_ORIGINAL_ENCODING_KEY = "JMS_AMQP_ORIGINAL_ENCODING"; - public static final short AMQP_UNKNOWN = 0; public static final short AMQP_NULL = 1; public static final short AMQP_DATA = 2; @@ -147,4 +192,134 @@ public final class AmqpMessageSupport { return baos.toByteArray(); } } + + /** + * Return the encoded form of the BytesMessage as an AMQP Binary instance. + * + * @param message + * the Message whose binary encoded body is needed. + * + * @return a Binary instance containing the encoded message body. + * + * @throws JMSException if an error occurs while fetching the binary payload. + */ + public static Binary getBinaryFromMessageBody(ActiveMQBytesMessage message) throws JMSException { + Binary result = null; + + if (message.getContent() != null) { + ByteSequence contents = message.getContent(); + + if (message.isCompressed()) { + int length = (int) message.getBodyLength(); + byte[] uncompressed = new byte[length]; + message.readBytes(uncompressed); + + result = new Binary(uncompressed); + } else { + return new Binary(contents.getData(), contents.getOffset(), contents.getLength()); + } + } + + return result; + } + + /** + * Return the encoded form of the BytesMessage as an AMQP Binary instance. + * + * @param message + * the Message whose binary encoded body is needed. + * + * @return a Binary instance containing the encoded message body. + * + * @throws JMSException if an error occurs while fetching the binary payload. + */ + public static Binary getBinaryFromMessageBody(ActiveMQObjectMessage message) throws JMSException { + Binary result = null; + + if (message.getContent() != null) { + ByteSequence contents = message.getContent(); + + if (message.isCompressed()) { + try (ByteArrayOutputStream os = new ByteArrayOutputStream(); + ByteArrayInputStream is = new ByteArrayInputStream(contents); + InflaterInputStream iis = new InflaterInputStream(is);) { + + byte value; + while ((value = (byte) iis.read()) != -1) { + os.write(value); + } + + ByteSequence expanded = os.toByteSequence(); + result = new Binary(expanded.getData(), expanded.getOffset(), expanded.getLength()); + } catch (Exception cause) { + throw JMSExceptionSupport.create(cause); + } + } else { + return new Binary(contents.getData(), contents.getOffset(), contents.getLength()); + } + } + + return result; + } + + /** + * Return the encoded form of the Message as an AMQP Binary instance. + * + * @param message + * the Message whose binary encoded body is needed. + * + * @return a Binary instance containing the encoded message body. + * + * @throws JMSException if an error occurs while fetching the binary payload. + */ + public static Binary getBinaryFromMessageBody(ActiveMQTextMessage message) throws JMSException { + Binary result = null; + + if (message.getContent() != null) { + ByteSequence contents = message.getContent(); + + if (message.isCompressed()) { + try (ByteArrayInputStream is = new ByteArrayInputStream(contents); + InflaterInputStream iis = new InflaterInputStream(is); + DataInputStream dis = new DataInputStream(iis);) { + + int size = dis.readInt(); + byte[] uncompressed = new byte[size]; + dis.readFully(uncompressed); + + result = new Binary(uncompressed); + } catch (Exception cause) { + throw JMSExceptionSupport.create(cause); + } + } else { + // Message includes a size prefix of four bytes for the OpenWire marshaler + result = new Binary(contents.getData(), contents.getOffset() + 4, contents.getLength() - 4); + } + } else if (message.getText() != null) { + result = new Binary(message.getText().getBytes(StandardCharsets.UTF_8)); + } + + return result; + } + + /** + * Return the underlying Map from the JMS MapMessage instance. + * + * @param message + * the MapMessage whose underlying Map is requested. + * + * @return the underlying Map used to store the value in the given MapMessage. + * + * @throws JMSException if an error occurs in constructing or fetching the Map. + */ + public static Map getMapFromMessageBody(ActiveMQMapMessage message) throws JMSException { + final HashMap map = new HashMap(); + + final Map contentMap = message.getContentMap(); + if (contentMap != null) { + map.putAll(contentMap); + } + + return contentMap; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java new file mode 100644 index 0000000000..399eb3fa9a --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AmqpWritableBuffer.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.nio.ByteBuffer; + +import org.apache.qpid.proton.codec.WritableBuffer; + +/** + * + */ +public class AmqpWritableBuffer implements WritableBuffer { + + public final static int DEFAULT_CAPACITY = 4 * 1024; + + byte buffer[]; + int position; + + /** + * Creates a new WritableBuffer with default capacity. + */ + public AmqpWritableBuffer() { + this(DEFAULT_CAPACITY); + } + + /** + * Create a new WritableBuffer with the given capacity. + */ + public AmqpWritableBuffer(int capacity) { + this.buffer = new byte[capacity]; + } + + public byte[] getArray() { + return buffer; + } + + public int getArrayLength() { + return position; + } + + @Override + public void put(byte b) { + int newPosition = position + 1; + ensureCapacity(newPosition); + buffer[position] = b; + position = newPosition; + } + + @Override + public void putShort(short value) { + ensureCapacity(position + 2); + buffer[position++] = (byte)(value >>> 8); + buffer[position++] = (byte)(value >>> 0); + } + + @Override + public void putInt(int value) { + ensureCapacity(position + 4); + buffer[position++] = (byte)(value >>> 24); + buffer[position++] = (byte)(value >>> 16); + buffer[position++] = (byte)(value >>> 8); + buffer[position++] = (byte)(value >>> 0); + } + + @Override + public void putLong(long value) { + ensureCapacity(position + 8); + buffer[position++] = (byte)(value >>> 56); + buffer[position++] = (byte)(value >>> 48); + buffer[position++] = (byte)(value >>> 40); + buffer[position++] = (byte)(value >>> 32); + buffer[position++] = (byte)(value >>> 24); + buffer[position++] = (byte)(value >>> 16); + buffer[position++] = (byte)(value >>> 8); + buffer[position++] = (byte)(value >>> 0); + } + + @Override + public void putFloat(float value) { + putInt(Float.floatToRawIntBits(value)); + } + + @Override + public void putDouble(double value) { + putLong(Double.doubleToRawLongBits(value)); + } + + @Override + public void put(byte[] src, int offset, int length) { + if (length == 0) { + return; + } + + int newPosition = position + length; + ensureCapacity(newPosition); + System.arraycopy(src, offset, buffer, position, length); + position = newPosition; + } + + @Override + public boolean hasRemaining() { + return position < Integer.MAX_VALUE; + } + + @Override + public int remaining() { + return Integer.MAX_VALUE - position; + } + + @Override + public int position() { + return position; + } + + @Override + public void position(int position) { + ensureCapacity(position); + this.position = position; + } + + @Override + public void put(ByteBuffer payload) { + int newPosition = position + payload.remaining(); + ensureCapacity(newPosition); + while (payload.hasRemaining()) { + buffer[position++] = payload.get(); + } + + position = newPosition; + } + + @Override + public int limit() { + return Integer.MAX_VALUE; + } + + /** + * Ensures the the buffer has at least the minimumCapacity specified. + * + * @param minimumCapacity + * the minimum capacity needed to meet the next write operation. + */ + private void ensureCapacity(int minimumCapacity) { + if (minimumCapacity > buffer.length) { + byte newBuffer[] = new byte[Math.max(buffer.length << 1, minimumCapacity)]; + System.arraycopy(buffer, 0, newBuffer, 0, position); + buffer = newBuffer; + } + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java index f0f71a8bce..edfdecfc1f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java @@ -16,33 +16,31 @@ */ package org.apache.activemq.transport.amqp.message; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_NATIVE; + import javax.jms.BytesMessage; -import javax.jms.Message; + +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMessage; public class AutoOutboundTransformer extends JMSMappingOutboundTransformer { - private final JMSMappingOutboundTransformer transformer; - - public AutoOutboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); - - transformer = new JMSMappingOutboundTransformer(vendor); - } + private final JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); @Override - public EncodedMessage transform(Message msg) throws Exception { - if (msg == null) { + public EncodedMessage transform(ActiveMQMessage message) throws Exception { + if (message == null) { return null; } - if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { - if (msg instanceof BytesMessage) { - return AMQPNativeOutboundTransformer.transform(this, (BytesMessage) msg); + if (message.getBooleanProperty(JMS_AMQP_NATIVE)) { + if (message instanceof BytesMessage) { + return AMQPNativeOutboundTransformer.transform(this, (ActiveMQBytesMessage) message); } else { return null; } } else { - return transformer.transform(msg); + return transformer.transform(message); } } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index e6b7a0fe39..323a9c14bc 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -16,14 +16,25 @@ */ package org.apache.activemq.transport.amqp.message; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FIRST_ACQUIRER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_HEADER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PROPERTIES; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID; + +import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Set; -import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.transport.amqp.AmqpProtocolException; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Decimal128; @@ -42,32 +53,17 @@ import org.apache.qpid.proton.amqp.messaging.Properties; public abstract class InboundTransformer { - protected final ActiveMQJMSVendor vendor; - public static final String TRANSFORMER_NATIVE = "native"; public static final String TRANSFORMER_RAW = "raw"; public static final String TRANSFORMER_JMS = "jms"; - protected String prefixVendor = "JMS_AMQP_"; - protected String prefixDeliveryAnnotations = "DA_"; - protected String prefixMessageAnnotations = "MA_"; - protected String prefixFooter = "FT_"; - - protected int defaultDeliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT; - protected int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; - protected long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; - - public InboundTransformer(ActiveMQJMSVendor vendor) { - this.vendor = vendor; - } - public abstract String getTransformerName(); public abstract InboundTransformer getFallbackTransformer(); - public final Message transform(EncodedMessage amqpMessage) throws Exception { + public final ActiveMQMessage transform(EncodedMessage amqpMessage) throws Exception { InboundTransformer transformer = this; - Message message = null; + ActiveMQMessage message = null; while (transformer != null) { try { @@ -85,79 +81,40 @@ public abstract class InboundTransformer { return message; } - protected abstract Message doTransform(EncodedMessage amqpMessage) throws Exception; - - public int getDefaultDeliveryMode() { - return defaultDeliveryMode; - } - - public void setDefaultDeliveryMode(int defaultDeliveryMode) { - this.defaultDeliveryMode = defaultDeliveryMode; - } - - public int getDefaultPriority() { - return defaultPriority; - } - - public void setDefaultPriority(int defaultPriority) { - this.defaultPriority = defaultPriority; - } - - public long getDefaultTtl() { - return defaultTtl; - } - - public void setDefaultTtl(long defaultTtl) { - this.defaultTtl = defaultTtl; - } - - public String getPrefixVendor() { - return prefixVendor; - } - - public void setPrefixVendor(String prefixVendor) { - this.prefixVendor = prefixVendor; - } - - public ActiveMQJMSVendor getVendor() { - return vendor; - } + protected abstract ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception; @SuppressWarnings("unchecked") - protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + protected void populateMessage(ActiveMQMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception { Header header = amqp.getHeader(); - if (header == null) { - header = new Header(); - } + if (header != null) { + jms.setBooleanProperty(JMS_AMQP_HEADER, true); - if (header.getDurable() != null) { - jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + if (header.getDurable() != null) { + jms.setPersistent(header.getDurable().booleanValue()); + } + + if (header.getPriority() != null) { + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setPriority((byte) Message.DEFAULT_PRIORITY); + } + + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer()); + } + + if (header.getDeliveryCount() != null) { + jms.setRedeliveryCounter(header.getDeliveryCount().intValue()); + } } else { - jms.setJMSDeliveryMode(defaultDeliveryMode); - } - - if (header.getPriority() != null) { - jms.setJMSPriority(header.getPriority().intValue()); - } else { - jms.setJMSPriority(defaultPriority); - } - - if (header.getFirstAcquirer() != null) { - jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); - } - - if (header.getDeliveryCount() != null) { - vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + jms.setPriority((byte) Message.DEFAULT_PRIORITY); } final MessageAnnotations ma = amqp.getMessageAnnotations(); if (ma != null) { for (Map.Entry entry : ma.getValue().entrySet()) { String key = entry.getKey().toString(); - if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { - // Legacy annotation, JMSType value will be replaced by Subject further down if also present. - jms.setJMSType(entry.getValue().toString()); - } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); long delay = deliveryTime - System.currentTimeMillis(); if (delay > 0) { @@ -185,82 +142,72 @@ public abstract class InboundTransformer { } } - setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue()); } } final ApplicationProperties ap = amqp.getApplicationProperties(); if (ap != null) { for (Map.Entry entry : (Set>) ap.getValue().entrySet()) { - String key = entry.getKey().toString(); - if ("JMSXGroupID".equals(key)) { - vendor.setJMSXGroupID(jms, entry.getValue().toString()); - } else if ("JMSXGroupSequence".equals(key)) { - vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); - } else if ("JMSXUserID".equals(key)) { - vendor.setJMSXUserID(jms, entry.getValue().toString()); - } else { - setProperty(jms, key, entry.getValue()); - } + setProperty(jms, entry.getKey().toString(), entry.getValue()); } } final Properties properties = amqp.getProperties(); if (properties != null) { + jms.setBooleanProperty(JMS_AMQP_PROPERTIES, true); if (properties.getMessageId() != null) { jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId())); } Binary userId = properties.getUserId(); if (userId != null) { - vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + jms.setUserID(new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8)); } if (properties.getTo() != null) { - jms.setJMSDestination(vendor.createDestination(properties.getTo())); + jms.setDestination((ActiveMQDestination.createDestination(properties.getTo(), ActiveMQDestination.QUEUE_TYPE))); } if (properties.getSubject() != null) { - jms.setJMSType(properties.getSubject()); + jms.setType(properties.getSubject()); } if (properties.getReplyTo() != null) { - jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); + jms.setReplyTo((ActiveMQDestination.createDestination(properties.getReplyTo(), ActiveMQDestination.QUEUE_TYPE))); } if (properties.getCorrelationId() != null) { - jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); + jms.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); } if (properties.getContentType() != null) { - jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString()); } if (properties.getContentEncoding() != null) { - jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString()); } if (properties.getCreationTime() != null) { - jms.setJMSTimestamp(properties.getCreationTime().getTime()); + jms.setTimestamp(properties.getCreationTime().getTime()); } if (properties.getGroupId() != null) { - vendor.setJMSXGroupID(jms, properties.getGroupId()); + jms.setGroupID(properties.getGroupId()); } if (properties.getGroupSequence() != null) { - vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + jms.setGroupSequence(properties.getGroupSequence().intValue()); } if (properties.getReplyToGroupId() != null) { - jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId()); } if (properties.getAbsoluteExpiryTime() != null) { - jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + jms.setExpiration(properties.getAbsoluteExpiryTime().getTime()); } } // If the jms expiration has not yet been set... - if (jms.getJMSExpiration() == 0) { + if (header != null && jms.getJMSExpiration() == 0) { // Then lets try to set it based on the message ttl. - long ttl = defaultTtl; + long ttl = Message.DEFAULT_TIME_TO_LIVE; if (header.getTtl() != null) { ttl = header.getTtl().longValue(); } - if (ttl == 0) { - jms.setJMSExpiration(0); - } else { - jms.setJMSExpiration(System.currentTimeMillis() + ttl); + if (ttl != javax.jms.Message.DEFAULT_TIME_TO_LIVE) { + jms.setExpiration(System.currentTimeMillis() + ttl); } } @@ -268,7 +215,7 @@ public abstract class InboundTransformer { if (fp != null) { for (Map.Entry entry : (Set>) fp.getValue().entrySet()) { String key = entry.getKey().toString(); - setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue()); } } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java index 707e5da4dc..79e4c2cbaf 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java @@ -18,13 +18,14 @@ package org.apache.activemq.transport.amqp.message; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL; -import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_MAP; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_NULL; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getCharsetForTextualContent; @@ -37,10 +38,19 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; +import java.util.Set; -import javax.jms.StreamMessage; +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.transport.amqp.AmqpProtocolException; +import org.apache.activemq.util.ByteSequence; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; @@ -50,10 +60,6 @@ import org.apache.qpid.proton.message.Message; public class JMSMappingInboundTransformer extends InboundTransformer { - public JMSMappingInboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); - } - @Override public String getTransformerName() { return TRANSFORMER_JMS; @@ -61,55 +67,52 @@ public class JMSMappingInboundTransformer extends InboundTransformer { @Override public InboundTransformer getFallbackTransformer() { - return new AMQPNativeInboundTransformer(getVendor()); + return new AMQPNativeInboundTransformer(); } @Override - protected javax.jms.Message doTransform(EncodedMessage amqpMessage) throws Exception { + protected ActiveMQMessage doTransform(EncodedMessage amqpMessage) throws Exception { Message amqp = amqpMessage.decode(); - javax.jms.Message result = createMessage(amqp, amqpMessage); - - result.setJMSDeliveryMode(defaultDeliveryMode); - result.setJMSPriority(defaultPriority); - result.setJMSExpiration(defaultTtl); + ActiveMQMessage result = createMessage(amqp, amqpMessage); populateMessage(result, amqp); - result.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); - result.setBooleanProperty(prefixVendor + "NATIVE", false); + if (amqpMessage.getMessageFormat() != 0) { + result.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat()); + } return result; } @SuppressWarnings({ "unchecked" }) - private javax.jms.Message createMessage(Message message, EncodedMessage original) throws Exception { + private ActiveMQMessage createMessage(Message message, EncodedMessage original) throws Exception { Section body = message.getBody(); - javax.jms.Message result; + ActiveMQMessage result; if (body == null) { if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = vendor.createObjectMessage(); + result = new ActiveMQObjectMessage(); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) { - result = vendor.createBytesMessage(); + result = new ActiveMQBytesMessage(); } else { Charset charset = getCharsetForTextualContent(message.getContentType()); if (charset != null) { - result = vendor.createTextMessage(); + result = new ActiveMQTextMessage(); } else { - result = vendor.createMessage(); + result = new ActiveMQMessage(); } } - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); } else if (body instanceof Data) { Binary payload = ((Data) body).getValue(); if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) { - result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } else { Charset charset = getCharsetForTextualContent(message.getContentType()); if (StandardCharsets.UTF_8.equals(charset)) { @@ -117,51 +120,51 @@ public class JMSMappingInboundTransformer extends InboundTransformer { try { CharBuffer chars = charset.newDecoder().decode(buf); - result = vendor.createTextMessage(String.valueOf(chars)); + result = createTextMessage(String.valueOf(chars)); } catch (CharacterCodingException e) { - result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } } else { - result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } } - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); } else if (body instanceof AmqpSequence) { AmqpSequence sequence = (AmqpSequence) body; - StreamMessage m = vendor.createStreamMessage(); + ActiveMQStreamMessage m = new ActiveMQStreamMessage(); for (Object item : sequence.getValue()) { m.writeObject(item); } result = m; - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); if (value == null || value instanceof String) { - result = vendor.createTextMessage((String) value); + result = createTextMessage((String) value); - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING); } else if (value instanceof Binary) { Binary payload = (Binary) value; if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) { - result = vendor.createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createObjectMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } else { - result = vendor.createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); + result = createBytesMessage(payload.getArray(), payload.getArrayOffset(), payload.getLength()); } - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); } else if (value instanceof List) { - StreamMessage m = vendor.createStreamMessage(); + ActiveMQStreamMessage m = new ActiveMQStreamMessage(); for (Object item : (List) value) { m.writeObject(item); } result = m; - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_LIST); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST); } else if (value instanceof Map) { - result = vendor.createMapMessage((Map) value); - result.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_MAP); + result = createMapMessage((Map) value); + result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP); } else { // Trigger fall-back to native encoder which generates BytesMessage with the // original message stored in the message body. @@ -173,4 +176,34 @@ public class JMSMappingInboundTransformer extends InboundTransformer { return result; } + + private static ActiveMQBytesMessage createBytesMessage(byte[] content, int offset, int length) { + ActiveMQBytesMessage message = new ActiveMQBytesMessage(); + message.setContent(new ByteSequence(content, offset, length)); + return message; + } + + public static ActiveMQTextMessage createTextMessage(String text) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + try { + message.setText(text); + } catch (MessageNotWriteableException ex) {} + + return message; + } + + public static ActiveMQObjectMessage createObjectMessage(byte[] content, int offset, int length) { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setContent(new ByteSequence(content, offset, length)); + return message; + } + + public static ActiveMQMapMessage createMapMessage(Map content) throws JMSException { + ActiveMQMapMessage message = new ActiveMQMapMessage(); + final Set> set = content.entrySet(); + for (Map.Entry entry : set) { + message.setObject(entry.getKey(), entry.getValue()); + } + return message; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 59c306fc69..985f4f524d 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -18,40 +18,63 @@ package org.apache.activemq.transport.amqp.message; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL; -import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_LIST; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_STRING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.DELIVERY_ANNOTATION_PREFIX; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.EMPTY_BINARY; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FIRST_ACQUIRER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.FOOTER_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.HEADER; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_FOOTER_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_PREFIX_LENGTH; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_ANNOTATION_PREFIX; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.MESSAGE_FORMAT; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.NATIVE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.ORIGINAL_ENCODING; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.PROPERTIES; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.REPLYTO_GROUP_ID; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getBinaryFromMessageBody; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.getMapFromMessageBody; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageEOFException; -import javax.jms.MessageFormatException; -import javax.jms.ObjectMessage; import javax.jms.Queue; -import javax.jms.StreamMessage; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageId; import org.apache.activemq.transport.amqp.AmqpProtocolException; +import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.activemq.util.TypeConversionSupport; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; @@ -66,12 +89,12 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.codec.AMQPDefinedTypes; +import org.apache.qpid.proton.codec.DecoderImpl; +import org.apache.qpid.proton.codec.EncoderImpl; +import org.fusesource.hawtbuf.UTF8Buffer; -public class JMSMappingOutboundTransformer extends OutboundTransformer { +public class JMSMappingOutboundTransformer implements OutboundTransformer { public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); @@ -81,225 +104,276 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; - // Deprecated legacy values used by old QPid AMQP 1.0 JMS client. - - public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type"); - public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type"); - - public static final String LEGACY_QUEUE_TYPE = "queue"; - public static final String LEGACY_TOPIC_TYPE = "topic"; - public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue"; - public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic"; - - public JMSMappingOutboundTransformer(ActiveMQJMSVendor vendor) { - super(vendor); + // For now Proton requires that we create a decoder to create an encoder + private final DecoderImpl decoder = new DecoderImpl(); + private final EncoderImpl encoder = new EncoderImpl(decoder); + { + AMQPDefinedTypes.registerAllTypes(decoder, encoder); } @Override - public EncodedMessage transform(Message msg) throws Exception { - if (msg == null) { + public EncodedMessage transform(ActiveMQMessage message) throws Exception { + if (message == null) { return null; } - try { - if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { - return null; - } - } catch (MessageFormatException e) { - return null; - } - ProtonJMessage amqp = convert(msg); - - long messageFormat; - try { - messageFormat = msg.getLongProperty(this.messageFormatKey); - } catch (MessageFormatException e) { - return null; - } - - ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); - final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); - int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); - if (overflow.position() > 0) { - buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); - c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); - } - - return new EncodedMessage(messageFormat, buffer.array(), 0, c); - } - - /** - * Perform the conversion between JMS Message and Proton Message without - * re-encoding it to array. This is needed because some frameworks may elect - * to do this on their own way. - * - * @param message - * The message to transform into an AMQP version for dispatch. - * - * @return an AMQP Message that represents the given JMS Message. - * - * @throws Exception if an error occurs during the conversion. - */ - public ProtonJMessage convert(Message message) throws JMSException, UnsupportedEncodingException { - Header header = new Header(); - Properties props = new Properties(); - + long messageFormat = 0; + Header header = null; + Properties properties = null; Map daMap = null; Map maMap = null; Map apMap = null; Map footerMap = null; - Section body = null; - body = convertBody(message); + Section body = convertBody(message); - header.setDurable(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); - header.setPriority(new UnsignedByte((byte) message.getJMSPriority())); - if (message.getJMSType() != null) { - props.setSubject(message.getJMSType()); + if (message.isPersistent()) { + if (header == null) { + header = new Header(); + } + header.setDurable(true); } - if (message.getJMSMessageID() != null) { - props.setMessageId(vendor.getOriginalMessageId(message)); + byte priority = message.getPriority(); + if (priority != Message.DEFAULT_PRIORITY) { + if (header == null) { + header = new Header(); + } + header.setPriority(new UnsignedByte(priority)); } - if (message.getJMSDestination() != null) { - props.setTo(vendor.toAddress(message.getJMSDestination())); + String type = message.getType(); + if (type != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setSubject(type); + } + MessageId messageId = message.getMessageId(); + if (messageId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setMessageId(getOriginalMessageId(message)); + } + ActiveMQDestination destination = message.getDestination(); + if (destination != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setTo(destination.getQualifiedName()); if (maMap == null) { maMap = new HashMap(); } - maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(message.getJMSDestination())); - - // Deprecated: used by legacy QPid AMQP 1.0 JMS client - maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSDestination())); + maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination)); } - if (message.getJMSReplyTo() != null) { - props.setReplyTo(vendor.toAddress(message.getJMSReplyTo())); + ActiveMQDestination replyTo = message.getReplyTo(); + if (replyTo != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyTo(replyTo.getQualifiedName()); if (maMap == null) { maMap = new HashMap(); } - maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(message.getJMSReplyTo())); - - // Deprecated: used by legacy QPid AMQP 1.0 JMS client - maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(message.getJMSReplyTo())); + maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo)); } - if (message.getJMSCorrelationID() != null) { - String correlationId = message.getJMSCorrelationID(); + String correlationId = message.getCorrelationId(); + if (correlationId != null) { + if (properties == null) { + properties = new Properties(); + } try { - props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); } catch (AmqpProtocolException e) { - props.setCorrelationId(correlationId); + properties.setCorrelationId(correlationId); } } - if (message.getJMSExpiration() != 0) { - long ttl = message.getJMSExpiration() - System.currentTimeMillis(); + long expiration = message.getExpiration(); + if (expiration != 0) { + long ttl = expiration - System.currentTimeMillis(); if (ttl < 0) { ttl = 1; } + + if (header == null) { + header = new Header(); + } header.setTtl(new UnsignedInteger((int) ttl)); - props.setAbsoluteExpiryTime(new Date(message.getJMSExpiration())); - } - if (message.getJMSTimestamp() != 0) { - props.setCreationTime(new Date(message.getJMSTimestamp())); - } - - @SuppressWarnings("unchecked") - final Enumeration keys = message.getPropertyNames(); - - while (keys.hasMoreElements()) { - String key = keys.nextElement(); - if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(AMQP_ORIGINAL_ENCODING_KEY)) { - // skip transformer appended properties - } else if (key.equals(firstAcquirerKey)) { - header.setFirstAcquirer(message.getBooleanProperty(key)); - } else if (key.startsWith("JMSXDeliveryCount")) { - // The AMQP delivery-count field only includes prior failed delivery attempts, - // whereas JMSXDeliveryCount includes the first/current delivery attempt. - int amqpDeliveryCount = message.getIntProperty(key) - 1; - if (amqpDeliveryCount > 0) { - header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); - } - } else if (key.startsWith("JMSXUserID")) { - String value = message.getStringProperty(key); - props.setUserId(new Binary(value.getBytes("UTF-8"))); - } else if (key.startsWith("JMSXGroupID")) { - String value = message.getStringProperty(key); - props.setGroupId(value); - if (apMap == null) { - apMap = new HashMap(); - } - apMap.put(key, value); - } else if (key.startsWith("JMSXGroupSeq")) { - UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key)); - props.setGroupSequence(value); - if (apMap == null) { - apMap = new HashMap(); - } - apMap.put(key, value); - } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { - if (daMap == null) { - daMap = new HashMap(); - } - String name = key.substring(prefixDeliveryAnnotationsKey.length()); - daMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - } else if (key.startsWith(prefixMessageAnnotationsKey)) { - if (maMap == null) { - maMap = new HashMap(); - } - String name = key.substring(prefixMessageAnnotationsKey.length()); - maMap.put(Symbol.valueOf(name), message.getObjectProperty(key)); - } else if (key.equals(contentTypeKey)) { - props.setContentType(Symbol.getSymbol(message.getStringProperty(key))); - } else if (key.equals(contentEncodingKey)) { - props.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key))); - } else if (key.equals(replyToGroupIDKey)) { - props.setReplyToGroupId(message.getStringProperty(key)); - } else if (key.startsWith(prefixFooterKey)) { - if (footerMap == null) { - footerMap = new HashMap(); - } - String name = key.substring(prefixFooterKey.length()); - footerMap.put(name, message.getObjectProperty(key)); - } else { - if (apMap == null) { - apMap = new HashMap(); - } - apMap.put(key, message.getObjectProperty(key)); + if (properties == null) { + properties = new Properties(); } + properties.setAbsoluteExpiryTime(new Date(expiration)); + } + long timeStamp = message.getTimestamp(); + if (timeStamp != 0) { + if (properties == null) { + properties = new Properties(); + } + properties.setCreationTime(new Date(timeStamp)); } - MessageAnnotations ma = null; - if (maMap != null) { - ma = new MessageAnnotations(maMap); + // JMSX Message Properties + int deliveryCount = message.getRedeliveryCounter(); + if (deliveryCount > 0) { + if (header == null) { + header = new Header(); + } + header.setDeliveryCount(new UnsignedInteger(deliveryCount)); + } + String userId = message.getUserID(); + if (userId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8))); + } + String groupId = message.getGroupID(); + if (groupId != null) { + if (properties == null) { + properties = new Properties(); + } + properties.setGroupId(groupId); + } + int groupSequence = message.getGroupSequence(); + if (groupSequence > 0) { + UnsignedInteger value = new UnsignedInteger(groupSequence); + if (properties == null) { + properties = new Properties(); + } + properties.setGroupSequence(value); + } + + final Map entries; + try { + entries = message.getProperties(); + } catch (IOException e) { + throw JMSExceptionSupport.create(e); + } + + for (Map.Entry entry : entries.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof UTF8Buffer) { + value = value.toString(); + } + + if (key.startsWith(JMS_AMQP_PREFIX)) { + if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) { + // skip transformer appended properties + continue; + } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) { + // skip transformer appended properties + continue; + } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) { + messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class); + continue; + } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) { + if (header == null) { + header = new Header(); + } + continue; + } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + continue; + } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (maMap == null) { + maMap = new HashMap(); + } + String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length()); + maMap.put(Symbol.valueOf(name), value); + continue; + } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) { + if (header == null) { + header = new Header(); + } + header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class)); + continue; + } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class))); + continue; + } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class))); + continue; + } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) { + if (properties == null) { + properties = new Properties(); + } + properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class)); + continue; + } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (daMap == null) { + daMap = new HashMap(); + } + String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length()); + daMap.put(Symbol.valueOf(name), value); + continue; + } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) { + if (footerMap == null) { + footerMap = new HashMap(); + } + String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length()); + footerMap.put(name, value); + continue; + } + } + + // The property didn't map into any other slot so we store it in the + // Application Properties section of the message. + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, value); + } + + final AmqpWritableBuffer buffer = new AmqpWritableBuffer(); + encoder.setByteBuffer(buffer); + + if (header != null) { + encoder.writeObject(header); } - DeliveryAnnotations da = null; if (daMap != null) { - da = new DeliveryAnnotations(daMap); + encoder.writeObject(new DeliveryAnnotations(daMap)); + } + if (maMap != null) { + encoder.writeObject(new MessageAnnotations(maMap)); + } + if (properties != null) { + encoder.writeObject(properties); } - ApplicationProperties ap = null; if (apMap != null) { - ap = new ApplicationProperties(apMap); + encoder.writeObject(new ApplicationProperties(apMap)); + } + if (body != null) { + encoder.writeObject(body); } - Footer footer = null; if (footerMap != null) { - footer = new Footer(footerMap); + encoder.writeObject(new Footer(footerMap)); } - return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); + return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength()); } - private Section convertBody(Message message) throws JMSException { + private Section convertBody(ActiveMQMessage message) throws JMSException { Section body = null; short orignalEncoding = AMQP_UNKNOWN; - if (message.propertyExists(AMQP_ORIGINAL_ENCODING_KEY)) { - try { - orignalEncoding = message.getShortProperty(AMQP_ORIGINAL_ENCODING_KEY); - } catch (Exception ex) { - } + try { + orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING); + } catch (Exception ex) { + // Ignore and stick with UNKNOWN } - if (message instanceof BytesMessage) { - Binary payload = vendor.getBinaryFromMessageBody((BytesMessage) message); + if (message instanceof ActiveMQBytesMessage) { + Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message); if (payload == null) { payload = EMPTY_BINARY; @@ -317,12 +391,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new Data(payload); break; } - } else if (message instanceof TextMessage) { + } else if (message instanceof ActiveMQTextMessage) { switch (orignalEncoding) { case AMQP_NULL: break; case AMQP_DATA: - body = new Data(vendor.getBinaryFromMessageBody((TextMessage) message)); + body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message)); break; case AMQP_VALUE_STRING: case AMQP_UNKNOWN: @@ -330,11 +404,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new AmqpValue(((TextMessage) message).getText()); break; } - } else if (message instanceof MapMessage) { - body = new AmqpValue(vendor.getMapFromMessageBody((MapMessage) message)); - } else if (message instanceof StreamMessage) { + } else if (message instanceof ActiveMQMapMessage) { + body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message)); + } else if (message instanceof ActiveMQStreamMessage) { ArrayList list = new ArrayList(); - final StreamMessage m = (StreamMessage) message; + final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message; try { while (true) { list.add(m.readObject()); @@ -352,8 +426,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { body = new AmqpValue(list); break; } - } else if (message instanceof ObjectMessage) { - Binary payload = vendor.getBinaryFromMessageBody((ObjectMessage) message); + } else if (message instanceof ActiveMQObjectMessage) { + Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message); if (payload == null) { payload = EMPTY_BINARY; @@ -373,8 +447,10 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { // For a non-AMQP message we tag the outbound content type as containing // a serialized Java object so that an AMQP client has a hint as to what // we are sending it. - if (!message.propertyExists(contentTypeKey)) { - vendor.setMessageProperty(message, contentTypeKey, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) { + message.setReadOnlyProperties(false); + message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); + message.setReadOnlyProperties(true); } } @@ -399,23 +475,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); } - // Used by legacy QPid AMQP 1.0 JMS client. - @Deprecated - private static String destinationAttributes(Destination destination) { - if (destination instanceof Queue) { - if (destination instanceof TemporaryQueue) { - return LEGACY_TEMP_QUEUE_TYPE; - } else { - return LEGACY_QUEUE_TYPE; - } - } else if (destination instanceof Topic) { - if (destination instanceof TemporaryTopic) { - return LEGACY_TEMP_TOPIC_TYPE; - } else { - return LEGACY_TOPIC_TYPE; + private static Object getOriginalMessageId(ActiveMQMessage message) { + Object result; + MessageId messageId = message.getMessageId(); + if (messageId.getTextView() != null) { + try { + result = AMQPMessageIdHelper.INSTANCE.toIdObject(messageId.getTextView()); + } catch (AmqpProtocolException e) { + result = messageId.getTextView(); } + } else { + result = messageId.toString(); } - throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer."); + return result; } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java index 2eefa50c29..6ca9cedd5e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java @@ -16,54 +16,10 @@ */ package org.apache.activemq.transport.amqp.message; -import javax.jms.Message; +import org.apache.activemq.command.ActiveMQMessage; -public abstract class OutboundTransformer { +public interface OutboundTransformer { - protected final ActiveMQJMSVendor vendor; + public abstract EncodedMessage transform(ActiveMQMessage message) throws Exception; - protected String prefixVendor; - - protected String prefixDeliveryAnnotations = "DA_"; - protected String prefixMessageAnnotations= "MA_"; - protected String prefixFooter = "FT_"; - - protected String messageFormatKey; - protected String nativeKey; - protected String firstAcquirerKey; - protected String prefixDeliveryAnnotationsKey; - protected String prefixMessageAnnotationsKey; - protected String contentTypeKey; - protected String contentEncodingKey; - protected String replyToGroupIDKey; - protected String prefixFooterKey; - - public OutboundTransformer(ActiveMQJMSVendor vendor) { - this.vendor = vendor; - this.setPrefixVendor("JMS_AMQP_"); - } - - public abstract EncodedMessage transform(Message jms) throws Exception; - - public String getPrefixVendor() { - return prefixVendor; - } - - public void setPrefixVendor(String prefixVendor) { - this.prefixVendor = prefixVendor; - - messageFormatKey = prefixVendor + "MESSAGE_FORMAT"; - nativeKey = prefixVendor + "NATIVE"; - firstAcquirerKey = prefixVendor + "FirstAcquirer"; - prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; - prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; - contentTypeKey = prefixVendor + "ContentType"; - contentEncodingKey = prefixVendor + "ContentEncoding"; - replyToGroupIDKey = prefixVendor + "ReplyToGroupID"; - prefixFooterKey = prefixVendor + prefixFooter; - } - - public ActiveMQJMSVendor getVendor() { - return vendor; - } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 503a05ea8d..33c319eee4 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -37,7 +37,6 @@ import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer; -import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.activemq.transport.amqp.message.InboundTransformer; import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; @@ -138,14 +137,14 @@ public class AmqpReceiver extends AmqpAbstractReceiver { if (inboundTransformer == null) { String transformer = session.getConnection().getConfiguredTransformer(); if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) { - inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new JMSMappingInboundTransformer(); } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) { - inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPNativeInboundTransformer(); } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) { - inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPRawInboundTransformer(); } else { LOG.warn("Unknown transformer type {} using native one instead", transformer); - inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + inboundTransformer = new AMQPNativeInboundTransformer(); } } return inboundTransformer; @@ -157,7 +156,7 @@ public class AmqpReceiver extends AmqpAbstractReceiver { EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length); InboundTransformer transformer = getTransformer(); - ActiveMQMessage message = (ActiveMQMessage) transformer.transform(em); + ActiveMQMessage message = transformer.transform(em); current = null; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 455e0b09af..2531c1aadf 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.protocol; import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT; import java.io.IOException; import java.util.LinkedList; @@ -39,7 +40,6 @@ import org.apache.activemq.command.Response; import org.apache.activemq.command.TransactionId; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.ResponseHandler; -import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.activemq.transport.amqp.message.OutboundTransformer; @@ -75,11 +75,10 @@ public class AmqpSender extends AmqpAbstractLink { private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; - private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); + private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(); private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); private final LinkedList outbound = new LinkedList(); private final LinkedList dispatchedInTx = new LinkedList(); - private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; private final ConsumerInfo consumerInfo; private AbstractSubscription subscription; @@ -437,8 +436,8 @@ public class AmqpSender extends AmqpAbstractLink { temp = (ActiveMQMessage) md.getMessage(); } - if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { - temp.setProperty(MESSAGE_FORMAT_KEY, 0); + if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) { + temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0); } } @@ -477,6 +476,7 @@ public class AmqpSender extends AmqpAbstractLink { currentDelivery = getEndpoint().delivery(tag, 0, tag.length); } currentDelivery.setContext(md); + currentDelivery.setMessageFormat((int) amqp.getMessageFormat()); } else { // TODO: message could not be generated what now? } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java index b513c1a8e1..201cee2ff8 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -87,8 +87,6 @@ public class AmqpTransformerTest { assertTrue(message instanceof BytesMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); assertEquals(7, message.getJMSPriority()); @@ -136,8 +134,6 @@ public class AmqpTransformerTest { LOG.info("Recieved message: {}", message); assertTrue(message instanceof BytesMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); @@ -184,8 +180,6 @@ public class AmqpTransformerTest { assertTrue(message instanceof TextMessage); Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); - Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); - assertEquals(0L, messageFormat.longValue()); assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed); assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java index 84d586405f..fa61e1423e 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -468,4 +469,46 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport { amqp.close(); openwire.close(); } + + //----- Test Qpid JMS to Qpid JMS interop with transformers --------------// + + @Test + public void testQpidJMSToQpidJMSMessageSendReceive() throws Exception { + final int SIZE = 1024; + final int NUM_MESSAGES = 100; + + Connection amqpSend = createConnection("client-1"); + Connection amqpReceive = createConnection("client-2"); + + amqpReceive.start(); + + Session senderSession = amqpSend.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session receiverSession = amqpReceive.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination queue = senderSession.createQueue(getDestinationName()); + + MessageProducer amqpProducer = senderSession.createProducer(queue); + MessageConsumer amqpConsumer = receiverSession.createConsumer(queue); + + byte[] payload = new byte[SIZE]; + + for (int i = 0; i < NUM_MESSAGES; ++i) { + BytesMessage outgoing = senderSession.createBytesMessage(); + outgoing.setLongProperty("SendTime", System.currentTimeMillis()); + outgoing.writeBytes(payload); + amqpProducer.send(outgoing); + } + + // Now consumer the message + for (int i = 0; i < NUM_MESSAGES; ++i) { + Message received = amqpConsumer.receive(2000); + assertNotNull(received); + assertTrue("Expected BytesMessage but got " + received, received instanceof BytesMessage); + BytesMessage incoming = (BytesMessage) received; + assertEquals(SIZE, incoming.getBodyLength()); + } + + amqpReceive.close(); + amqpSend.close(); + } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 2b1b874975..e5e1bbdead 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -257,7 +257,7 @@ public class AmqpMessage { * @return the set message ID in String form or null if not set. */ public String getMessageId() { - if (message.getProperties() == null) { + if (message.getProperties() == null || message.getProperties().getMessageId() == null) { return null; } @@ -309,7 +309,7 @@ public class AmqpMessage { * @return the set correlation ID in String form or null if not set. */ public String getCorrelationId() { - if (message.getProperties() == null) { + if (message.getProperties() == null || message.getProperties().getCorrelationId() == null) { return null; } @@ -387,7 +387,7 @@ public class AmqpMessage { * @return true if the message is marked as being durable. */ public boolean isDurable() { - if (message.getHeader() == null) { + if (message.getHeader() == null || message.getHeader().getDurable() == null) { return false; } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java index ba0f014128..1427b5a520 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java @@ -51,7 +51,6 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.message.Message; import org.junit.Test; -import org.mockito.Mockito; public class JMSMappingInboundTransformerTest { @@ -65,8 +64,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE); @@ -86,8 +84,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateBytesMessageFromNoBodySectionAndNoContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); @@ -107,8 +104,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateObjectMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE); @@ -122,8 +118,7 @@ public class JMSMappingInboundTransformerTest { @Test public void testCreateTextMessageFromNoBodySectionAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType("text/plain"); @@ -143,8 +138,7 @@ public class JMSMappingInboundTransformerTest { * @throws Exception if an error occurs during the test. */ public void testCreateGenericMessageFromNoBodySectionAndUnknownContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setContentType("unknown-content-type"); @@ -174,8 +168,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -197,8 +190,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -222,8 +214,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -246,8 +237,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -350,8 +340,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -377,8 +366,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -398,8 +386,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -415,8 +402,7 @@ public class JMSMappingInboundTransformerTest { */ @Test public void testCreateObjectMessageFromAmqpValueWithBinaryAndContentType() throws Exception { - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); Message message = Message.Factory.create(); message.setBody(new AmqpValue(new Binary(new byte[0]))); @@ -443,8 +429,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -465,8 +450,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -487,8 +471,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -509,8 +492,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertNotNull("Message should not be null", jmsMessage); @@ -531,8 +513,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); @@ -548,8 +529,7 @@ public class JMSMappingInboundTransformerTest { EncodedMessage em = encodeMessage(message); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(vendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); @@ -589,9 +569,7 @@ public class JMSMappingInboundTransformerTest { } private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class expectedClass) throws Exception { - ActiveMQTextMessage mockTextMessage = createMockTextMessage(); - ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); String toAddress = "toAddress"; Message amqp = Message.Factory.create(); @@ -608,11 +586,6 @@ public class JMSMappingInboundTransformerTest { javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); - - // Verify that createDestination was called with the provided 'to' - // address and 'Destination' class - // TODO - No need to really test this bit ? - // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass); } //----- ReplyTo Conversions ----------------------------------------------// @@ -643,9 +616,7 @@ public class JMSMappingInboundTransformerTest { } private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class expectedClass) throws Exception { - ActiveMQTextMessage mockTextMessage = createMockTextMessage(); - ActiveMQJMSVendor mockVendor = createMockVendor(mockTextMessage); - JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(); String replyToAddress = "replyToAddress"; Message amqp = Message.Factory.create(); @@ -662,31 +633,10 @@ public class JMSMappingInboundTransformerTest { javax.jms.Message jmsMessage = transformer.transform(em); assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); - - // Verify that createDestination was called with the provided 'replyTo' - // address and 'Destination' class - // TODO - No need to really test this bit ? - // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass); } //----- Utility Methods --------------------------------------------------// - private ActiveMQTextMessage createMockTextMessage() { - return Mockito.mock(ActiveMQTextMessage.class); - } - - private ActiveMQJMSVendor createMockVendor(ActiveMQTextMessage mockTextMessage) { - ActiveMQJMSVendor mockVendor = Mockito.mock(ActiveMQJMSVendor.class); - Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage); - Mockito.when(mockVendor.createTextMessage(Mockito.any(String.class))).thenReturn(mockTextMessage); - - return mockVendor; - } - - private ActiveMQJMSVendor createVendor() { - return ActiveMQJMSVendor.INSTANCE; - } - private EncodedMessage encodeMessage(Message message) { byte[] encodeBuffer = new byte[1024 * 8]; int encodedSize; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java index 021ab85339..d0d31ccfcc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java @@ -18,10 +18,10 @@ package org.apache.activemq.transport.amqp.message; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_DATA; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_NULL; -import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_ORIGINAL_ENCODING_KEY; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_SEQUENCE; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_UNKNOWN; import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.AMQP_VALUE_BINARY; +import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_ORIGINAL_ENCODING; import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION; import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION; import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.QUEUE_TYPE; @@ -43,20 +43,20 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import javax.jms.Destination; import javax.jms.JMSException; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -70,6 +70,9 @@ import org.mockito.Mockito; public class JMSMappingOutboundTransformerTest { + private final UUID TEST_OBJECT_VALUE = UUID.fromString("fee14b62-09e0-4ac6-a4c3-4206c630d844"); + private final String TEST_ADDRESS = "queue://testAddress"; + //----- no-body Message type tests ---------------------------------------// @Test @@ -78,10 +81,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNull(amqp.getBody()); } @@ -89,14 +94,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertTextMessageToAmqpMessageWithNoBodyOriginalEncodingWasNull() throws Exception { ActiveMQTextMessage outbound = createTextMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_NULL); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNull(amqp.getBody()); } @@ -109,10 +116,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -128,10 +137,12 @@ public class JMSMappingOutboundTransformerTest { outbound.storeContent(); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -152,10 +163,12 @@ public class JMSMappingOutboundTransformerTest { outbound.storeContent(); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -171,14 +184,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertEmptyBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { ActiveMQBytesMessage outbound = createBytesMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -190,15 +205,17 @@ public class JMSMappingOutboundTransformerTest { public void testConvertUncompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { byte[] expectedPayload = new byte[] { 8, 16, 24, 32 }; ActiveMQBytesMessage outbound = createBytesMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.writeBytes(expectedPayload); outbound.storeContent(); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -215,15 +232,17 @@ public class JMSMappingOutboundTransformerTest { public void testConvertCompressedBytesMessageToAmqpMessageWithAmqpValueBody() throws Exception { byte[] expectedPayload = new byte[] { 8, 16, 24, 32 }; ActiveMQBytesMessage outbound = createBytesMessage(true); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.writeBytes(expectedPayload); outbound.storeContent(); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -244,10 +263,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -263,10 +284,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -288,10 +311,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -312,10 +337,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -325,14 +352,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { ActiveMQStreamMessage outbound = createStreamMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpSequence); @@ -347,10 +376,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -365,16 +396,18 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedStreamMessageToAmqpMessageWithAmqpSequencey() throws Exception { ActiveMQStreamMessage outbound = createStreamMessage(true); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_SEQUENCE); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE); outbound.writeBoolean(false); outbound.writeString("test"); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpSequence); @@ -394,10 +427,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -407,14 +442,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertEmptyObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -424,14 +461,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertEmptyObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -445,10 +484,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -462,14 +503,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID()); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -483,14 +526,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID()); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -504,14 +549,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageWithDataBody() throws Exception { - ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true); + ActiveMQObjectMessage outbound = createObjectMessage(TEST_OBJECT_VALUE, true); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -525,14 +572,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageUnknownEncodingGetsDataSection() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_UNKNOWN); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_UNKNOWN); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -546,14 +595,16 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertCompressedObjectMessageToAmqpMessageWithAmqpValueBody() throws Exception { ActiveMQObjectMessage outbound = createObjectMessage(UUID.randomUUID(), true); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_VALUE_BINARY); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -573,10 +624,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -587,14 +640,16 @@ public class JMSMappingOutboundTransformerTest { public void testConvertTextMessageCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { String contentString = "myTextMessageContent"; ActiveMQTextMessage outbound = createTextMessage(contentString); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -609,13 +664,15 @@ public class JMSMappingOutboundTransformerTest { public void testConvertTextMessageContentNotStoredCreatesBodyUsingOriginalEncodingWithDataSection() throws Exception { String contentString = "myTextMessageContent"; ActiveMQTextMessage outbound = createTextMessage(contentString); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -633,10 +690,12 @@ public class JMSMappingOutboundTransformerTest { outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -649,10 +708,12 @@ public class JMSMappingOutboundTransformerTest { ActiveMQTextMessage outbound = createTextMessage(contentString); outbound.onSend(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof AmqpValue); @@ -663,14 +724,16 @@ public class JMSMappingOutboundTransformerTest { public void testConvertCompressedTextMessageCreatesDataSectionBody() throws Exception { String contentString = "myTextMessageContent"; ActiveMQTextMessage outbound = createTextMessage(contentString, true); - outbound.setShortProperty(AMQP_ORIGINAL_ENCODING_KEY, AMQP_DATA); + outbound.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA); outbound.onSend(); outbound.storeContent(); - ActiveMQJMSVendor vendor = createVendor(); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(vendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(outbound); + EncodedMessage encoded = transformer.transform(outbound); + assertNotNull(encoded); + + Message amqp = encoded.decode(); assertNotNull(amqp.getBody()); assertTrue(amqp.getBody() instanceof Data); @@ -690,45 +753,35 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertMessageWithJMSDestinationQueue() throws Exception { - Queue mockDest = Mockito.mock(Queue.class); - - doTestConvertMessageWithJMSDestination(mockDest, QUEUE_TYPE); + doTestConvertMessageWithJMSDestination(createMockDestination(QUEUE_TYPE), QUEUE_TYPE); } @Test public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception { - TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); - - doTestConvertMessageWithJMSDestination(mockDest, TEMP_QUEUE_TYPE); + doTestConvertMessageWithJMSDestination(createMockDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE); } @Test public void testConvertMessageWithJMSDestinationTopic() throws Exception { - Topic mockDest = Mockito.mock(Topic.class); - - doTestConvertMessageWithJMSDestination(mockDest, TOPIC_TYPE); + doTestConvertMessageWithJMSDestination(createMockDestination(TOPIC_TYPE), TOPIC_TYPE); } @Test public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception { - TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); - - doTestConvertMessageWithJMSDestination(mockDest, TEMP_TOPIC_TYPE); + doTestConvertMessageWithJMSDestination(createMockDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE); } - private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception { + private void doTestConvertMessageWithJMSDestination(ActiveMQDestination jmsDestination, Object expectedAnnotationValue) throws Exception { ActiveMQTextMessage mockTextMessage = createMockTextMessage(); Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent"); - Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination); - ActiveMQJMSVendor mockVendor = createMockVendor(); - String toAddress = "someToAddress"; - if (jmsDestination != null) { - Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress); - } + Mockito.when(mockTextMessage.getDestination()).thenReturn(jmsDestination); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(mockTextMessage); + EncodedMessage encoded = transformer.transform(mockTextMessage); + assertNotNull(encoded); + + Message amqp = encoded.decode(); MessageAnnotations ma = amqp.getMessageAnnotations(); Map maMap = ma == null ? null : ma.getValue(); @@ -740,7 +793,7 @@ public class JMSMappingOutboundTransformerTest { } if (jmsDestination != null) { - assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress()); + assertEquals("Unexpected 'to' address", jmsDestination.getQualifiedName(), amqp.getAddress()); } } @@ -753,45 +806,35 @@ public class JMSMappingOutboundTransformerTest { @Test public void testConvertMessageWithJMSReplyToQueue() throws Exception { - Queue mockDest = Mockito.mock(Queue.class); - - doTestConvertMessageWithJMSReplyTo(mockDest, QUEUE_TYPE); + doTestConvertMessageWithJMSReplyTo(createMockDestination(QUEUE_TYPE), QUEUE_TYPE); } @Test public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception { - TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); - - doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_QUEUE_TYPE); + doTestConvertMessageWithJMSReplyTo(createMockDestination(TEMP_QUEUE_TYPE), TEMP_QUEUE_TYPE); } @Test public void testConvertMessageWithJMSReplyToTopic() throws Exception { - Topic mockDest = Mockito.mock(Topic.class); - - doTestConvertMessageWithJMSReplyTo(mockDest, TOPIC_TYPE); + doTestConvertMessageWithJMSReplyTo(createMockDestination(TOPIC_TYPE), TOPIC_TYPE); } @Test public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception { - TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); - - doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_TOPIC_TYPE); + doTestConvertMessageWithJMSReplyTo(createMockDestination(TEMP_TOPIC_TYPE), TEMP_TOPIC_TYPE); } - private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception { + private void doTestConvertMessageWithJMSReplyTo(ActiveMQDestination jmsReplyTo, Object expectedAnnotationValue) throws Exception { ActiveMQTextMessage mockTextMessage = createMockTextMessage(); Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent"); - Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo); - ActiveMQJMSVendor mockVendor = createMockVendor(); - String replyToAddress = "someReplyToAddress"; - if (jmsReplyTo != null) { - Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress); - } + Mockito.when(mockTextMessage.getReplyTo()).thenReturn(jmsReplyTo); - JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(); - Message amqp = transformer.convert(mockTextMessage); + EncodedMessage encoded = transformer.transform(mockTextMessage); + assertNotNull(encoded); + + Message amqp = encoded.decode(); MessageAnnotations ma = amqp.getMessageAnnotations(); Map maMap = ma == null ? null : ma.getValue(); @@ -803,7 +846,7 @@ public class JMSMappingOutboundTransformerTest { } if (jmsReplyTo != null) { - assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo()); + assertEquals("Unexpected 'reply-to' address", jmsReplyTo.getQualifiedName(), amqp.getReplyTo()); } } @@ -812,16 +855,35 @@ public class JMSMappingOutboundTransformerTest { private ActiveMQTextMessage createMockTextMessage() throws Exception { ActiveMQTextMessage mockTextMessage = Mockito.mock(ActiveMQTextMessage.class); Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet())); + Mockito.when(mockTextMessage.getPriority()).thenReturn((byte) Message.DEFAULT_PRIORITY); return mockTextMessage; } - private ActiveMQJMSVendor createVendor() { - return ActiveMQJMSVendor.INSTANCE; - } + private ActiveMQDestination createMockDestination(byte destType) { + ActiveMQDestination mockDestination = null; + switch (destType) { + case QUEUE_TYPE: + mockDestination = Mockito.mock(ActiveMQQueue.class); + Mockito.when(mockDestination.getQualifiedName()).thenReturn("queue://" + TEST_ADDRESS); + break; + case TOPIC_TYPE: + mockDestination = Mockito.mock(ActiveMQTopic.class); + Mockito.when(mockDestination.getQualifiedName()).thenReturn("topic://" + TEST_ADDRESS); + break; + case TEMP_QUEUE_TYPE: + mockDestination = Mockito.mock(ActiveMQTempQueue.class); + Mockito.when(mockDestination.getQualifiedName()).thenReturn("tempQueue://" + TEST_ADDRESS); + break; + case TEMP_TOPIC_TYPE: + mockDestination = Mockito.mock(ActiveMQTempTopic.class); + Mockito.when(mockDestination.getQualifiedName()).thenReturn("tempTopic://" + TEST_ADDRESS); + break; + default: + throw new IllegalArgumentException("Invliad Destination Type given/"); + } - private ActiveMQJMSVendor createMockVendor() { - return Mockito.mock(ActiveMQJMSVendor.class); + return mockDestination; } private ActiveMQMessage createMessage() { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java index 98d6722040..204c65264f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSTransformationSpeedComparisonTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.message; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; @@ -52,7 +53,7 @@ import org.slf4j.LoggerFactory; /** * Some simple performance tests for the Message Transformers. */ -@Ignore("Turn on to profile.") +@Ignore("Enable for profiling") @RunWith(Parameterized.class) public class JMSTransformationSpeedComparisonTest { @@ -63,7 +64,7 @@ public class JMSTransformationSpeedComparisonTest { private final String transformer; - private final int WARM_CYCLES = 50; + private final int WARM_CYCLES = 1000; private final int PROFILE_CYCLES = 1000000; public JMSTransformationSpeedComparisonTest(String transformer) { @@ -82,11 +83,11 @@ public class JMSTransformationSpeedComparisonTest { private InboundTransformer getInboundTransformer() { switch (transformer) { case "raw": - return new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); + return new AMQPRawInboundTransformer(); case "native": - return new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + return new AMQPNativeInboundTransformer(); default: - return new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); + return new JMSMappingInboundTransformer(); } } @@ -94,9 +95,9 @@ public class JMSTransformationSpeedComparisonTest { switch (transformer) { case "raw": case "native": - return new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE); + return new AMQPNativeOutboundTransformer(); default: - return new JMSMappingOutboundTransformer(ActiveMQJMSVendor.INSTANCE); + return new JMSMappingOutboundTransformer(); } } @@ -113,7 +114,7 @@ public class JMSTransformationSpeedComparisonTest { // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -122,7 +123,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -149,7 +150,7 @@ public class JMSTransformationSpeedComparisonTest { // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -158,7 +159,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -177,7 +178,7 @@ public class JMSTransformationSpeedComparisonTest { // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -186,7 +187,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -205,7 +206,7 @@ public class JMSTransformationSpeedComparisonTest { // Warm up for (int i = 0; i < WARM_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -214,7 +215,7 @@ public class JMSTransformationSpeedComparisonTest { long startTime = System.nanoTime(); for (int i = 0; i < PROFILE_CYCLES; ++i) { - ActiveMQMessage intermediate = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); intermediate.onSend(); outboundTransformer.transform(intermediate); } @@ -255,7 +256,7 @@ public class JMSTransformationSpeedComparisonTest { InboundTransformer inboundTransformer = getInboundTransformer(); OutboundTransformer outboundTransformer = getOutboundTransformer(); - ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage outbound = inboundTransformer.transform(encoded); outbound.onSend(); // Warm up @@ -276,7 +277,6 @@ public class JMSTransformationSpeedComparisonTest { transformer, PROFILE_CYCLES, TimeUnit.NANOSECONDS.toMillis(totalDuration), test.getMethodName()); } - @Ignore @Test public void testEncodeDecodeIsWorking() throws Exception { Message incomingMessage = createTypicalQpidJMSMessage(); @@ -284,7 +284,7 @@ public class JMSTransformationSpeedComparisonTest { InboundTransformer inboundTransformer = getInboundTransformer(); OutboundTransformer outboundTransformer = getOutboundTransformer(); - ActiveMQMessage outbound = (ActiveMQMessage) inboundTransformer.transform(encoded); + ActiveMQMessage outbound = inboundTransformer.transform(encoded); outbound.onSend(); Message outboudMessage = outboundTransformer.transform(outbound).decode(); @@ -319,6 +319,25 @@ public class JMSTransformationSpeedComparisonTest { assertEquals(incomingBody.getValue(), outgoingBody.getValue()); } + @Test + public void testBodyOnlyEncodeDecode() throws Exception { + + Message incomingMessage = Proton.message(); + + incomingMessage.setBody(new AmqpValue("String payload for AMQP message conversion performance testing.")); + + EncodedMessage encoded = encode(incomingMessage); + InboundTransformer inboundTransformer = getInboundTransformer(); + OutboundTransformer outboundTransformer = getOutboundTransformer(); + + ActiveMQMessage intermediate = inboundTransformer.transform(encoded); + intermediate.onSend(); + Message outboudMessage = outboundTransformer.transform(intermediate).decode(); + + assertNull(outboudMessage.getHeader()); + assertNull(outboudMessage.getProperties()); + } + private Message createTypicalQpidJMSMessage() { Map applicationProperties = new HashMap(); Map messageAnnotations = new HashMap();