From 6e693196068d10d177a57e01c823da0ecaaed9a6 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 17 Feb 2015 15:42:45 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5592 Initial drop of the JMS transformer code to be reworked. --- activemq-amqp/pom.xml | 7 +- .../transport/amqp/ActiveMQJMSVendor.java | 4 +- .../transport/amqp/AmqpProtocolConverter.java | 14 +- .../transport/amqp/AmqpTransportFilter.java | 2 +- .../message/AMQPNativeInboundTransformer.java | 36 ++ .../AMQPNativeOutboundTransformer.java | 103 +++++ .../message/AMQPRawInboundTransformer.java | 47 +++ .../amqp/message/AutoOutboundTransformer.java | 52 +++ .../amqp/message/EncodedMessage.java | 67 ++++ .../amqp/message/InboundTransformer.java | 360 ++++++++++++++++++ .../message/JMSMappingInboundTransformer.java | 109 ++++++ .../JMSMappingOutboundTransformer.java | 314 +++++++++++++++ .../transport/amqp/message/JMSVendor.java | 65 ++++ .../amqp/message/OutboundTransformer.java | 87 +++++ .../JMSMappingInboundTransformerTest.java | 259 +++++++++++++ .../JMSMappingOutboundTransformerTest.java | 309 +++++++++++++++ .../activemq-maven-plugin/pom.xml | 2 - pom.xml | 29 +- 18 files changed, 1840 insertions(+), 26 deletions(-) create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 35a3177895..358422d469 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -42,7 +42,7 @@ org.apache.qpid - proton-jms + proton-j ${qpid-proton-version} @@ -108,6 +108,11 @@ junit test + + org.mockito + mockito-core + test + org.slf4j slf4j-log4j12 diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java index b576c6b35f..c00a390520 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java @@ -39,10 +39,8 @@ import org.apache.activemq.command.ActiveMQTempQueue; import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.qpid.proton.jms.JMSVendor; +import org.apache.activemq.transport.amqp.message.JMSVendor; -/** - */ public class ActiveMQJMSVendor extends JMSVendor { final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor(); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 425b264bee..16bfa3d2c0 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -63,6 +63,13 @@ import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.PersistenceAdapterSupport; +import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; +import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer; +import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; +import org.apache.activemq.transport.amqp.message.EncodedMessage; +import org.apache.activemq.transport.amqp.message.InboundTransformer; +import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; +import org.apache.activemq.transport.amqp.message.OutboundTransformer; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.LongSequenceGenerator; @@ -102,13 +109,6 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; import org.apache.qpid.proton.framing.TransportFrame; -import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer; -import org.apache.qpid.proton.jms.AMQPRawInboundTransformer; -import org.apache.qpid.proton.jms.AutoOutboundTransformer; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.jms.InboundTransformer; -import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; -import org.apache.qpid.proton.jms.OutboundTransformer; import org.apache.qpid.proton.message.Message; import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.ByteArrayOutputStream; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index 3e361c22b1..fb7542b512 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -25,10 +25,10 @@ import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.amqp.message.InboundTransformer; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; -import org.apache.qpid.proton.jms.InboundTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java new file mode 100644 index 0000000000..9789e7b78f --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeInboundTransformer.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import javax.jms.Message; + +public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { + + public AMQPNativeInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + + Message rc = super.transform(amqpMessage); + + populateMessage(rc, amqp); + return rc; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000000..ebe6fcca66 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.nio.ByteBuffer; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageFormatException; + +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.codec.CompositeWritableBuffer; +import org.apache.qpid.proton.codec.DroppingWritableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.message.ProtonJMessage; + +public class AMQPNativeOutboundTransformer extends OutboundTransformer { + + public AMQPNativeOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) + return null; + if( !(msg instanceof BytesMessage) ) + return null; + try { + if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + return null; + } + } catch (MessageFormatException e) { + return null; + } + return transform(this, (BytesMessage) msg); + } + + static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + long messageFormat; + try { + messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT"); + } catch (MessageFormatException e) { + return null; + } + byte data[] = new byte[(int) msg.getBodyLength()]; + int dataSize = data.length; + msg.readBytes(data); + msg.reset(); + + try { + int count = msg.getIntProperty("JMSXDeliveryCount"); + if( count > 1 ) { + + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while( len > 0 ) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header... + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); + + // Re-encode... + ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]); + final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); + int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); + if( overflow.position() > 0 ) { + buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]); + c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); + } + data = buffer.array(); + dataSize = c; + } + } catch (JMSException e) { + } + + return new EncodedMessage(messageFormat, data, 0, dataSize); + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java new file mode 100644 index 0000000000..20703d7cb3 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import javax.jms.BytesMessage; +import javax.jms.Message; + +public class AMQPRawInboundTransformer extends InboundTransformer { + + public AMQPRawInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + BytesMessage rc = vendor.createBytesMessage(); + rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + + final long now = System.currentTimeMillis(); + rc.setJMSTimestamp(now); + if( defaultTtl > 0 ) { + rc.setJMSExpiration(now + defaultTtl); + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", true); + + return rc; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java new file mode 100644 index 0000000000..0f0d7b2047 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AutoOutboundTransformer.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import javax.jms.BytesMessage; +import javax.jms.Message; + +public class AutoOutboundTransformer extends JMSMappingOutboundTransformer { + + private final JMSMappingOutboundTransformer transformer; + + public AutoOutboundTransformer(JMSVendor vendor) { + super(vendor); + transformer = new JMSMappingOutboundTransformer(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) + return null; + if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + if( msg instanceof BytesMessage ) { + return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg); + } else { + return null; + } + } else { + return transformer.transform(msg); + } + } + + @Override + public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations) + { + super.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations); + transformer.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations); + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java new file mode 100644 index 0000000000..733c0ec036 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/EncodedMessage.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.message.Message; + +public class EncodedMessage { + + private final Binary data; + final long messageFormat; + + public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { + this.data = new Binary(data, offset, length); + this.messageFormat = messageFormat; + } + + public long getMessageFormat() { + return messageFormat; + } + + public Message decode() throws Exception { + Message amqp = Message.Factory.create(); + + int offset = getArrayOffset(); + int len = getLength(); + while (len > 0) { + final int decoded = amqp.decode(getArray(), offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + return amqp; + } + + public int getLength() { + return data.getLength(); + } + + public int getArrayOffset() { + return data.getArrayOffset(); + } + + public byte[] getArray() { + return data.getArray(); + } + + @Override + public String toString() { + return data.toString(); + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java new file mode 100644 index 0000000000..9e5758cde8 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Decimal128; +import org.apache.qpid.proton.amqp.Decimal32; +import org.apache.qpid.proton.amqp.Decimal64; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.apache.qpid.proton.amqp.UnsignedShort; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; + +public abstract class InboundTransformer { + + JMSVendor vendor; + + public static final String TRANSFORMER_NATIVE = "native"; + public static final String TRANSFORMER_RAW = "raw"; + public static final String TRANSFORMER_JMS = "jms"; + + String prefixVendor = "JMS_AMQP_"; + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations = "MA_"; + String prefixFooter = "FT_"; + + int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; + int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; + long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; + + private boolean useByteDestinationTypeAnnotations = false; + + public InboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + } + + abstract public Message transform(EncodedMessage amqpMessage) throws Exception; + + public boolean isUseByteDestinationTypeAnnotations() { + return useByteDestinationTypeAnnotations; + } + + public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations) { + this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations; + } + + public int getDefaultDeliveryMode() { + return defaultDeliveryMode; + } + + public void setDefaultDeliveryMode(int defaultDeliveryMode) { + this.defaultDeliveryMode = defaultDeliveryMode; + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public long getDefaultTtl() { + return defaultTtl; + } + + public void setDefaultTtl(long defaultTtl) { + this.defaultTtl = defaultTtl; + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } + + @SuppressWarnings("unchecked") + protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + Header header = amqp.getHeader(); + if (header == null) { + header = new Header(); + } + + if (header.getDurable() != null) { + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } else { + jms.setJMSDeliveryMode(defaultDeliveryMode); + } + if (header.getPriority() != null) { + jms.setJMSPriority(header.getPriority().intValue()); + } else { + jms.setJMSPriority(defaultPriority); + } + if (header.getFirstAcquirer() != null) { + jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); + } + if (header.getDeliveryCount() != null) { + vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + } + + final DeliveryAnnotations da = amqp.getDeliveryAnnotations(); + if (da != null) { + for (Map.Entry entry : da.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue()); + } + } + + Class toAttributes = null; + Class replyToAttributes = null; + + if (isUseByteDestinationTypeAnnotations()) { + toAttributes = Queue.class; + replyToAttributes = Queue.class; + } else { + toAttributes = Destination.class; + replyToAttributes = Destination.class; + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if (ma != null) { + for (Map.Entry entry : ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) { + jms.setJMSType(entry.getValue().toString()); + } else if ("x-opt-to-type".equals(key.toString())) { + toAttributes = toClassFromAttributes(entry.getValue()); + } else if ("x-opt-reply-type".equals(key.toString())) { + replyToAttributes = toClassFromAttributes(entry.getValue()); + } else { + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + } + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if (ap != null) { + for (Map.Entry entry : (Set>) ap.getValue().entrySet()) { + String key = entry.getKey().toString(); + if ("JMSXGroupID".equals(key)) { + vendor.setJMSXGroupID(jms, entry.getValue().toString()); + } else if ("JMSXGroupSequence".equals(key)) { + vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); + } else if ("JMSXUserID".equals(key)) { + vendor.setJMSXUserID(jms, entry.getValue().toString()); + } else { + setProperty(jms, key, entry.getValue()); + } + } + } + + final Properties properties = amqp.getProperties(); + if (properties != null) { + if (properties.getMessageId() != null) { + jms.setJMSMessageID(properties.getMessageId().toString()); + } + Binary userId = properties.getUserId(); + if (userId != null) { + vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + } + if (properties.getTo() != null) { + jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes)); + } + if (properties.getSubject() != null) { + jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + } + if (properties.getReplyTo() != null) { + jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes)); + } + if (properties.getCorrelationId() != null) { + jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + } + if (properties.getContentType() != null) { + jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + } + if (properties.getContentEncoding() != null) { + jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + } + if (properties.getCreationTime() != null) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if (properties.getGroupId() != null) { + vendor.setJMSXGroupID(jms, properties.getGroupId()); + } + if (properties.getGroupSequence() != null) { + vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + } + if (properties.getReplyToGroupId() != null) { + jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + } + if (properties.getAbsoluteExpiryTime() != null) { + jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime()); + } + } + + // If the jms expiration has not yet been set... + if (jms.getJMSExpiration() == 0) { + // Then lets try to set it based on the message ttl. + long ttl = defaultTtl; + if (header.getTtl() != null) { + ttl = header.getTtl().longValue(); + } + if (ttl == 0) { + jms.setJMSExpiration(0); + } else { + jms.setJMSExpiration(System.currentTimeMillis() + ttl); + } + } + + final Footer fp = amqp.getFooter(); + if (fp != null) { + for (Map.Entry entry : (Set>) fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + } + } + } + + private static final Set QUEUE_ATTRIBUTES = createSet("queue"); + private static final Set TOPIC_ATTRIBUTES = createSet("topic"); + private static final Set TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary"); + private static final Set TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary"); + + private static Set createSet(String... args) { + HashSet s = new HashSet(); + for (String arg : args) { + s.add(arg); + } + return Collections.unmodifiableSet(s); + } + + Class toClassFromAttributes(Object value) { + if (isUseByteDestinationTypeAnnotations()) { + if (value instanceof Byte) { + switch ((Byte) value) { + case JMSVendor.QUEUE_TYPE: + return Queue.class; + case JMSVendor.TOPIC_TYPE: + return Topic.class; + case JMSVendor.TEMP_QUEUE_TYPE: + return TemporaryQueue.class; + case JMSVendor.TEMP_TOPIC_TYPE: + return TemporaryTopic.class; + default: + return Queue.class; + } + } + + return Queue.class; + } else { + if (value == null) { + return null; + } + String valueString = value.toString(); + HashSet attributes = new HashSet(); + for (String x : valueString.split("\\s*,\\s*")) { + attributes.add(x); + } + + if (QUEUE_ATTRIBUTES.equals(attributes)) { + return Queue.class; + } + if (TOPIC_ATTRIBUTES.equals(attributes)) { + return Topic.class; + } + if (TEMP_QUEUE_ATTRIBUTES.equals(attributes)) { + return TemporaryQueue.class; + } + if (TEMP_TOPIC_ATTRIBUTES.equals(attributes)) { + return TemporaryTopic.class; + } + return Destination.class; + } + } + + private void setProperty(Message msg, String key, Object value) throws JMSException { + if (value instanceof UnsignedLong) { + long v = ((UnsignedLong) value).longValue(); + msg.setLongProperty(key, v); + } else if (value instanceof UnsignedInteger) { + long v = ((UnsignedInteger) value).longValue(); + if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { + msg.setIntProperty(key, (int) v); + } else { + msg.setLongProperty(key, v); + } + } else if (value instanceof UnsignedShort) { + int v = ((UnsignedShort) value).intValue(); + if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { + msg.setShortProperty(key, (short) v); + } else { + msg.setIntProperty(key, v); + } + } else if (value instanceof UnsignedByte) { + short v = ((UnsignedByte) value).shortValue(); + if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { + msg.setByteProperty(key, (byte) v); + } else { + msg.setShortProperty(key, v); + } + } else if (value instanceof Symbol) { + msg.setStringProperty(key, value.toString()); + } else if (value instanceof Decimal128) { + msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); + } else if (value instanceof Decimal64) { + msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); + } else if (value instanceof Decimal32) { + msg.setFloatProperty(key, ((Decimal32) value).floatValue()); + } else if (value instanceof Binary) { + msg.setStringProperty(key, value.toString()); + } else { + msg.setObjectProperty(key, value); + } + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java new file mode 100644 index 0000000000..63e216cb6d --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; + +public class JMSMappingInboundTransformer extends InboundTransformer { + + public JMSMappingInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @SuppressWarnings({ "unchecked" }) + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); + + Message rc; + final Section body = amqp.getBody(); + if (body == null) { + rc = vendor.createMessage(); + } else if (body instanceof Data) { + Binary d = ((Data) body).getValue(); + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } else if (body instanceof AmqpSequence) { + AmqpSequence sequence = (AmqpSequence) body; + StreamMessage m = vendor.createStreamMessage(); + for (Object item : sequence.getValue()) { + m.writeObject(item); + } + rc = m; + } else if (body instanceof AmqpValue) { + Object value = ((AmqpValue) body).getValue(); + if (value == null) { + rc = vendor.createObjectMessage(); + } + if (value instanceof String) { + TextMessage m = vendor.createTextMessage(); + m.setText((String) value); + rc = m; + } else if (value instanceof Binary) { + Binary d = (Binary) value; + BytesMessage m = vendor.createBytesMessage(); + m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); + rc = m; + } else if (value instanceof List) { + StreamMessage m = vendor.createStreamMessage(); + for (Object item : (List) value) { + m.writeObject(item); + } + rc = m; + } else if (value instanceof Map) { + MapMessage m = vendor.createMapMessage(); + final Set> set = ((Map) value).entrySet(); + for (Map.Entry entry : set) { + m.setObject(entry.getKey(), entry.getValue()); + } + rc = m; + } else { + ObjectMessage m = vendor.createObjectMessage(); + m.setObject((Serializable) value); + rc = m; + } + } else { + throw new RuntimeException("Unexpected body type: " + body.getClass()); + } + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + rc.setJMSExpiration(defaultTtl); + + populateMessage(rc, amqp); + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", false); + return rc; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java new file mode 100644 index 0000000000..768bb24a93 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -0,0 +1,314 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.CompositeWritableBuffer; +import org.apache.qpid.proton.codec.DroppingWritableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.message.ProtonJMessage; + +public class JMSMappingOutboundTransformer extends OutboundTransformer { + + public JMSMappingOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if (msg == null) { + return null; + } + + try { + if (msg.getBooleanProperty(prefixVendor + "NATIVE")) { + return null; + } + } catch (MessageFormatException e) { + return null; + } + ProtonJMessage amqp = convert(msg); + + long messageFormat; + try { + messageFormat = msg.getLongProperty(this.messageFormatKey); + } catch (MessageFormatException e) { + return null; + } + + ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]); + final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); + int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); + if (overflow.position() > 0) { + buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); + c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); + } + + return new EncodedMessage(messageFormat, buffer.array(), 0, c); + } + + /** + * Perform the conversion between JMS Message and Proton Message without + * re-encoding it to array. This is needed because some frameworks may elect + * to do this on their own way (Netty for instance using Nettybuffers) + * + * @param msg + * @return + * @throws Exception + */ + public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException { + Header header = new Header(); + Properties props = new Properties(); + HashMap daMap = null; + HashMap maMap = null; + HashMap apMap = null; + Section body = null; + HashMap footerMap = null; + if (msg instanceof BytesMessage) { + BytesMessage m = (BytesMessage) msg; + byte data[] = new byte[(int) m.getBodyLength()]; + m.readBytes(data); + m.reset(); // Need to reset after readBytes or future readBytes + // calls (ex: redeliveries) will fail and return -1 + body = new Data(new Binary(data)); + } + if (msg instanceof TextMessage) { + body = new AmqpValue(((TextMessage) msg).getText()); + } + if (msg instanceof MapMessage) { + final HashMap map = new HashMap(); + final MapMessage m = (MapMessage) msg; + final Enumeration names = m.getMapNames(); + while (names.hasMoreElements()) { + String key = names.nextElement(); + map.put(key, m.getObject(key)); + } + body = new AmqpValue(map); + } + if (msg instanceof StreamMessage) { + ArrayList list = new ArrayList(); + final StreamMessage m = (StreamMessage) msg; + try { + while (true) { + list.add(m.readObject()); + } + } catch (MessageEOFException e) { + } + body = new AmqpSequence(list); + } + if (msg instanceof ObjectMessage) { + body = new AmqpValue(((ObjectMessage) msg).getObject()); + } + + header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); + header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); + if (msg.getJMSType() != null) { + if (maMap == null) { + maMap = new HashMap(); + } + maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType()); + } + if (msg.getJMSMessageID() != null) { + props.setMessageId(msg.getJMSMessageID()); + } + if (msg.getJMSDestination() != null) { + props.setTo(vendor.toAddress(msg.getJMSDestination())); + if (maMap == null) { + maMap = new HashMap(); + } + maMap.put(Symbol.valueOf("x-opt-to-type"), destinationAttributes(msg.getJMSDestination())); + } + if (msg.getJMSReplyTo() != null) { + props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo())); + if (maMap == null) { + maMap = new HashMap(); + } + maMap.put(Symbol.valueOf("x-opt-reply-type"), destinationAttributes(msg.getJMSReplyTo())); + } + if (msg.getJMSCorrelationID() != null) { + props.setCorrelationId(msg.getJMSCorrelationID()); + } + if (msg.getJMSExpiration() != 0) { + long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); + if (ttl < 0) { + ttl = 1; + } + header.setTtl(new UnsignedInteger((int) ttl)); + + props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); + } + if (msg.getJMSTimestamp() != 0) { + props.setCreationTime(new Date(msg.getJMSTimestamp())); + } + + final Enumeration keys = msg.getPropertyNames(); + while (keys.hasMoreElements()) { + String key = keys.nextElement(); + if (key.equals(messageFormatKey) || key.equals(nativeKey)) { + // skip.. + } else if (key.equals(firstAcquirerKey)) { + header.setFirstAcquirer(msg.getBooleanProperty(key)); + } else if (key.startsWith("JMSXDeliveryCount")) { + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. + int amqpDeliveryCount = msg.getIntProperty(key) - 1; + if (amqpDeliveryCount > 0) { + header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); + } + } else if (key.startsWith("JMSXUserID")) { + String value = msg.getStringProperty(key); + props.setUserId(new Binary(value.getBytes("UTF-8"))); + } else if (key.startsWith("JMSXGroupID")) { + String value = msg.getStringProperty(key); + props.setGroupId(value); + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, value); + } else if (key.startsWith("JMSXGroupSeq")) { + UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); + props.setGroupSequence(value); + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, value); + } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { + if (daMap == null) { + daMap = new HashMap(); + } + String name = key.substring(prefixDeliveryAnnotationsKey.length()); + daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); + } else if (key.startsWith(prefixMessageAnnotationsKey)) { + if (maMap == null) { + maMap = new HashMap(); + } + String name = key.substring(prefixMessageAnnotationsKey.length()); + maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); + } else if (key.equals(subjectKey)) { + props.setSubject(msg.getStringProperty(key)); + } else if (key.equals(contentTypeKey)) { + props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); + } else if (key.equals(contentEncodingKey)) { + props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); + } else if (key.equals(replyToGroupIDKey)) { + props.setReplyToGroupId(msg.getStringProperty(key)); + } else if (key.startsWith(prefixFooterKey)) { + if (footerMap == null) { + footerMap = new HashMap(); + } + String name = key.substring(prefixFooterKey.length()); + footerMap.put(name, msg.getObjectProperty(key)); + } else { + if (apMap == null) { + apMap = new HashMap(); + } + apMap.put(key, msg.getObjectProperty(key)); + } + } + + MessageAnnotations ma = null; + if (maMap != null) { + ma = new MessageAnnotations(maMap); + } + DeliveryAnnotations da = null; + if (daMap != null) { + da = new DeliveryAnnotations(daMap); + } + ApplicationProperties ap = null; + if (apMap != null) { + ap = new ApplicationProperties(apMap); + } + Footer footer = null; + if (footerMap != null) { + footer = new Footer(footerMap); + } + + return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer); + } + + private Object destinationAttributes(Destination destination) { + if (isUseByteDestinationTypeAnnotations()) { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return JMSVendor.TEMP_QUEUE_TYPE; + } else { + return JMSVendor.QUEUE_TYPE; + } + } + if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return JMSVendor.TEMP_TOPIC_TYPE; + } else { + return JMSVendor.TOPIC_TYPE; + } + } + return JMSVendor.QUEUE_TYPE; + } else { + if (destination instanceof Queue) { + if (destination instanceof TemporaryQueue) { + return "temporary,queue"; + } else { + return "queue"; + } + } + if (destination instanceof Topic) { + if (destination instanceof TemporaryTopic) { + return "temporary,topic"; + } else { + return "topic"; + } + } + return ""; + } + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java new file mode 100644 index 0000000000..33d77c40ce --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +abstract public class JMSVendor { + + public static final byte QUEUE_TYPE = 0x00; + public static final byte TOPIC_TYPE = 0x01; + public static final byte TEMP_QUEUE_TYPE = 0x02; + public static final byte TEMP_TOPIC_TYPE = 0x03; + + public abstract BytesMessage createBytesMessage(); + + public abstract StreamMessage createStreamMessage(); + + public abstract Message createMessage(); + + public abstract TextMessage createTextMessage(); + + public abstract ObjectMessage createObjectMessage(); + + public abstract MapMessage createMapMessage(); + + public abstract void setJMSXUserID(Message message, String value); + + @Deprecated + public Destination createDestination(String name) { + return null; + } + + public T createDestination(String name, Class kind) { + return kind.cast(createDestination(name)); + } + + public abstract void setJMSXGroupID(Message message, String groupId); + + public abstract void setJMSXGroupSequence(Message message, int value); + + public abstract void setJMSXDeliveryCount(Message message, long value); + + public abstract String toAddress(Destination destination); + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java new file mode 100644 index 0000000000..6c7d8adcb8 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import javax.jms.Message; + +public abstract class OutboundTransformer { + + JMSVendor vendor; + String prefixVendor; + + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations= "MA_"; + String prefixFooter = "FT_"; + + String messageFormatKey; + String nativeKey; + String firstAcquirerKey; + String prefixDeliveryAnnotationsKey; + String prefixMessageAnnotationsKey; + String subjectKey; + String contentTypeKey; + String contentEncodingKey; + String replyToGroupIDKey; + String prefixFooterKey; + + private boolean useByteDestinationTypeAnnotations; + + public OutboundTransformer(JMSVendor vendor) { + this.vendor = vendor; + this.setPrefixVendor("JMS_AMQP_"); + } + + public abstract EncodedMessage transform(Message jms) throws Exception; + + public boolean isUseByteDestinationTypeAnnotations() + { + return useByteDestinationTypeAnnotations; + } + + public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations) + { + this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations; + } + + public String getPrefixVendor() { + return prefixVendor; + } + + public void setPrefixVendor(String prefixVendor) { + this.prefixVendor = prefixVendor; + + messageFormatKey = prefixVendor + "MESSAGE_FORMAT"; + nativeKey = prefixVendor + "NATIVE"; + firstAcquirerKey = prefixVendor + "FirstAcquirer"; + prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; + prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; + subjectKey = prefixVendor +"Subject"; + contentTypeKey = prefixVendor +"ContentType"; + contentEncodingKey = prefixVendor +"ContentEncoding"; + replyToGroupIDKey = prefixVendor +"ReplyToGroupID"; + prefixFooterKey = prefixVendor + prefixFooter; + + } + + public JMSVendor getVendor() { + return vendor; + } + + public void setVendor(JMSVendor vendor) { + this.vendor = vendor; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java new file mode 100644 index 0000000000..576c06ef23 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; +import org.junit.Test; +import org.mockito.Mockito; + +public class JMSMappingInboundTransformerTest { + + @Test + public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception { + TextMessage mockTextMessage = createMockTextMessage(); + JMSVendor mockVendor = createMockVendor(mockTextMessage); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + + String contentString = "myTextMessageContent"; + Message amqp = Message.Factory.create(); + amqp.setBody(new AmqpValue(contentString)); + + EncodedMessage em = encodeMessage(amqp); + + javax.jms.Message jmsMessage = transformer.transform(em); + + assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); + Mockito.verify(mockTextMessage).setText(contentString); + assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage); + } + + // ======= JMSDestination Handling ========= + + // --- String type annotation --- + @Test + public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false); + } + + @Test + public void testTransformWithQueueStringToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false); + } + + @Test + public void testTransformWithTemporaryQueueStringToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false); + } + + @Test + public void testTransformWithTopicStringToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false); + } + + @Test + public void testTransformWithTemporaryTopicStringToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false); + } + + // --- byte type annotation --- + + @Test + public void testTransformWithNoToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true); + } + + @Test + public void testTransformWithQueueByteToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true); + } + + @Test + public void testTransformWithTemporaryQueueByteToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true); + } + + @Test + public void testTransformWithTopicByteToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true); + } + + @Test + public void testTransformWithTemporaryTopicByteToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true); + } + + private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class expectedClass, + boolean byteType) throws Exception { + TextMessage mockTextMessage = createMockTextMessage(); + JMSVendor mockVendor = createMockVendor(mockTextMessage); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + if (byteType) { + transformer.setUseByteDestinationTypeAnnotations(true); + } + + String toAddress = "toAddress"; + Message amqp = Message.Factory.create(); + amqp.setBody(new AmqpValue("myTextMessageContent")); + amqp.setAddress(toAddress); + if (toTypeAnnotationValue != null) { + Map map = new HashMap(); + map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue); + MessageAnnotations ma = new MessageAnnotations(map); + amqp.setMessageAnnotations(ma); + } + + EncodedMessage em = encodeMessage(amqp); + + javax.jms.Message jmsMessage = transformer.transform(em); + assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); + + // Verify that createDestination was called with the provided 'to' + // address and 'Destination' class + Mockito.verify(mockVendor).createDestination(toAddress, expectedClass); + } + + // ======= JMSReplyTo Handling ========= + + // --- String type annotation --- + @Test + public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false); + } + + @Test + public void testTransformWithQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false); + } + + @Test + public void testTransformWithTemporaryQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false); + } + + @Test + public void testTransformWithTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false); + } + + @Test + public void testTransformWithTemporaryTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false); + } + + // --- byte type annotation --- + @Test + public void testTransformWithNoReplyToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true); + } + + @Test + public void testTransformWithQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true); + } + + @Test + public void testTransformWithTemporaryQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true); + } + + @Test + public void testTransformWithTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true); + } + + @Test + public void testTransformWithTemporaryTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception { + doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true); + } + + private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class expectedClass, + boolean byteType) throws Exception { + TextMessage mockTextMessage = createMockTextMessage(); + JMSVendor mockVendor = createMockVendor(mockTextMessage); + JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor); + if (byteType) { + transformer.setUseByteDestinationTypeAnnotations(true); + } + + String replyToAddress = "replyToAddress"; + Message amqp = Message.Factory.create(); + amqp.setBody(new AmqpValue("myTextMessageContent")); + amqp.setReplyTo(replyToAddress); + if (replyToTypeAnnotationValue != null) { + Map map = new HashMap(); + map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue); + MessageAnnotations ma = new MessageAnnotations(map); + amqp.setMessageAnnotations(ma); + } + + EncodedMessage em = encodeMessage(amqp); + + javax.jms.Message jmsMessage = transformer.transform(em); + assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage); + + // Verify that createDestination was called with the provided 'replyTo' + // address and 'Destination' class + Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass); + } + + // ======= Utility Methods ========= + + private TextMessage createMockTextMessage() { + TextMessage mockTextMessage = Mockito.mock(TextMessage.class); + + return mockTextMessage; + } + + private JMSVendor createMockVendor(TextMessage mockTextMessage) { + JMSVendor mockVendor = Mockito.mock(JMSVendor.class); + Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage); + + return mockVendor; + } + + private EncodedMessage encodeMessage(Message message) { + byte[] encodeBuffer = new byte[1024 * 8]; + int encodedSize; + while (true) { + try { + encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length); + break; + } catch (java.nio.BufferOverflowException e) { + encodeBuffer = new byte[encodeBuffer.length * 2]; + } + } + + long messageFormat = 0; + return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize); + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java new file mode 100644 index 0000000000..0c4a9c2fb2 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformerTest.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.message; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collections; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.message.Message; +import org.junit.Test; +import org.mockito.Mockito; + +public class JMSMappingOutboundTransformerTest { + + @Test + public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception { + String contentString = "myTextMessageContent"; + TextMessage mockTextMessage = createMockTextMessage(); + Mockito.when(mockTextMessage.getText()).thenReturn(contentString); + JMSVendor mockVendor = createMockVendor(); + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + + Message amqp = transformer.convert(mockTextMessage); + + assertNotNull(amqp.getBody()); + assertTrue(amqp.getBody() instanceof AmqpValue); + assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue()); + } + + @Test + public void testDefaultsTolStringDestinationTypeAnnotationValues() { + JMSVendor mockVendor = createMockVendor(); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + + assertFalse("Expected the older string style annotation values to be used by default", transformer.isUseByteDestinationTypeAnnotations()); + } + + @Test + public void testSetGetIsUseByteDestinationTypeAnnotations() { + JMSVendor mockVendor = createMockVendor(); + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + + assertFalse(transformer.isUseByteDestinationTypeAnnotations()); + transformer.setUseByteDestinationTypeAnnotations(true); + assertTrue(transformer.isUseByteDestinationTypeAnnotations()); + } + + // ======= JMSDestination Handling ========= + + // --- String type annotation --- + @Test + public void testConvertMessageWithJMSDestinationNull() throws Exception { + doTestConvertMessageWithJMSDestination(null, null, false); + } + + @Test + public void testConvertMessageWithJMSDestinationQueue() throws Exception { + Queue mockDest = Mockito.mock(Queue.class); + + doTestConvertMessageWithJMSDestination(mockDest, "queue", false); + } + + @Test + public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception { + TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); + + doTestConvertMessageWithJMSDestination(mockDest, "temporary,queue", false); + } + + @Test + public void testConvertMessageWithJMSDestinationTopic() throws Exception { + Topic mockDest = Mockito.mock(Topic.class); + + doTestConvertMessageWithJMSDestination(mockDest, "topic", false); + } + + @Test + public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception { + TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); + + doTestConvertMessageWithJMSDestination(mockDest, "temporary,topic", false); + } + + // --- byte type annotation --- + + @Test + public void testConvertMessageWithJMSDestinationNullUsingByteAnnotation() throws Exception { + doTestConvertMessageWithJMSDestination(null, null, true); + } + + @Test + public void testConvertMessageWithJMSDestinationQueueUsingByteAnnotation() throws Exception { + Queue mockDest = Mockito.mock(Queue.class); + + doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSDestinationTemporaryQueueUsingByteAnnotation() throws Exception { + TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); + + doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSDestinationTopicUsingByteAnnotation() throws Exception { + Topic mockDest = Mockito.mock(Topic.class); + + doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TOPIC_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSDestinationTemporaryTopicUsingByteAnnotation() throws Exception { + TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); + + doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSDestinationUnkownUsingByteAnnotation() throws Exception { + Destination mockDest = Mockito.mock(Destination.class); + + doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true); + } + + private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue, boolean byteType) throws Exception { + TextMessage mockTextMessage = createMockTextMessage(); + Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent"); + Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination); + JMSVendor mockVendor = createMockVendor(); + String toAddress = "someToAddress"; + if (jmsDestination != null) { + Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress); + } + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + if (byteType) { + transformer.setUseByteDestinationTypeAnnotations(true); + } + + Message amqp = transformer.convert(mockTextMessage); + + MessageAnnotations ma = amqp.getMessageAnnotations(); + Map maMap = ma == null ? null : ma.getValue(); + if (maMap != null) { + Object actualValue = maMap.get(Symbol.valueOf("x-opt-to-type")); + assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); + } else if (expectedAnnotationValue != null) { + fail("Expected annotation value, but there were no annotations"); + } + + if (jmsDestination != null) { + assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress()); + } + } + + // ======= JMSReplyTo Handling ========= + + // --- String type annotation --- + @Test + public void testConvertMessageWithJMSReplyToNull() throws Exception { + doTestConvertMessageWithJMSReplyTo(null, null, false); + } + + @Test + public void testConvertMessageWithJMSReplyToQueue() throws Exception { + Queue mockDest = Mockito.mock(Queue.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, "queue", false); + } + + @Test + public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception { + TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,queue", false); + } + + @Test + public void testConvertMessageWithJMSReplyToTopic() throws Exception { + Topic mockDest = Mockito.mock(Topic.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, "topic", false); + } + + @Test + public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception { + TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,topic", false); + } + + // --- byte type annotation --- + @Test + public void testConvertMessageWithJMSReplyToNullUsingByteAnnotation() throws Exception { + doTestConvertMessageWithJMSReplyTo(null, null, true); + } + + @Test + public void testConvertMessageWithJMSReplyToQueueUsingByteAnnotation() throws Exception { + Queue mockDest = Mockito.mock(Queue.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSReplyToTemporaryQueueUsingByteAnnotation() throws Exception { + TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSReplyToTopicUsingByteAnnotation() throws Exception { + Topic mockDest = Mockito.mock(Topic.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TOPIC_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSReplyToTemporaryTopicUsingByteAnnotation() throws Exception { + TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true); + } + + @Test + public void testConvertMessageWithJMSReplyToUnkownUsingByteAnnotation() throws Exception { + Destination mockDest = Mockito.mock(Destination.class); + + doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true); + } + + private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue, boolean byteType) throws Exception { + TextMessage mockTextMessage = createMockTextMessage(); + Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent"); + Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo); + JMSVendor mockVendor = createMockVendor(); + String replyToAddress = "someReplyToAddress"; + if (jmsReplyTo != null) { + Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress); + } + + JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor); + if (byteType) { + transformer.setUseByteDestinationTypeAnnotations(true); + } + + Message amqp = transformer.convert(mockTextMessage); + + MessageAnnotations ma = amqp.getMessageAnnotations(); + Map maMap = ma == null ? null : ma.getValue(); + if (maMap != null) { + Object actualValue = maMap.get(Symbol.valueOf("x-opt-reply-type")); + assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue); + } else if (expectedAnnotationValue != null) { + fail("Expected annotation value, but there were no annotations"); + } + + if (jmsReplyTo != null) { + assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo()); + } + } + + // ======= Utility Methods ========= + + private TextMessage createMockTextMessage() throws Exception { + TextMessage mockTextMessage = Mockito.mock(TextMessage.class); + Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet())); + + return mockTextMessage; + } + + private JMSVendor createMockVendor() { + JMSVendor mockVendor = Mockito.mock(JMSVendor.class); + + return mockVendor; + } +} diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml index 95a70e6a95..1c2c4c8f4f 100644 --- a/activemq-tooling/activemq-maven-plugin/pom.xml +++ b/activemq-tooling/activemq-maven-plugin/pom.xml @@ -67,11 +67,9 @@ org.apache.geronimo.specs geronimo-j2ee-management_1.1_spec - org.mockito mockito-core - ${mockito-version} test diff --git a/pom.xml b/pom.xml index 6c83f01082..1b05d36492 100755 --- a/pom.xml +++ b/pom.xml @@ -637,7 +637,6 @@ test - org.apache.xbean @@ -973,6 +972,12 @@ ${junit-version} test + + org.mockito + mockito-core + ${mockito-version} + test + org.jmock jmock-junit4 @@ -1283,17 +1288,17 @@ - - org.apache.maven.plugins - maven-plugin-plugin - [3.1,) - - descriptor - - - - - + + org.apache.maven.plugins + maven-plugin-plugin + [3.1,) + + descriptor + + + + +