mirror of https://github.com/apache/activemq.git
Initial drop of the JMS transformer code to be reworked.
This commit is contained in:
parent
388c16d084
commit
6e69319606
|
@ -42,7 +42,7 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>proton-jms</artifactId>
|
||||
<artifactId>proton-j</artifactId>
|
||||
<version>${qpid-proton-version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
@ -108,6 +108,11 @@
|
|||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
|
|
|
@ -39,10 +39,8 @@ import org.apache.activemq.command.ActiveMQTempQueue;
|
|||
import org.apache.activemq.command.ActiveMQTempTopic;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.qpid.proton.jms.JMSVendor;
|
||||
import org.apache.activemq.transport.amqp.message.JMSVendor;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ActiveMQJMSVendor extends JMSVendor {
|
||||
|
||||
final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
|
||||
|
|
|
@ -63,6 +63,13 @@ import org.apache.activemq.command.SubscriptionInfo;
|
|||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.activemq.store.PersistenceAdapterSupport;
|
||||
import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.EncodedMessage;
|
||||
import org.apache.activemq.transport.amqp.message.InboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
|
||||
import org.apache.activemq.transport.amqp.message.OutboundTransformer;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
|
@ -102,13 +109,6 @@ import org.apache.qpid.proton.engine.impl.CollectorImpl;
|
|||
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
|
||||
import org.apache.qpid.proton.engine.impl.TransportImpl;
|
||||
import org.apache.qpid.proton.framing.TransportFrame;
|
||||
import org.apache.qpid.proton.jms.AMQPNativeInboundTransformer;
|
||||
import org.apache.qpid.proton.jms.AMQPRawInboundTransformer;
|
||||
import org.apache.qpid.proton.jms.AutoOutboundTransformer;
|
||||
import org.apache.qpid.proton.jms.EncodedMessage;
|
||||
import org.apache.qpid.proton.jms.InboundTransformer;
|
||||
import org.apache.qpid.proton.jms.JMSMappingInboundTransformer;
|
||||
import org.apache.qpid.proton.jms.OutboundTransformer;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.fusesource.hawtbuf.Buffer;
|
||||
import org.fusesource.hawtbuf.ByteArrayOutputStream;
|
||||
|
|
|
@ -25,10 +25,10 @@ import org.apache.activemq.command.Command;
|
|||
import org.apache.activemq.transport.Transport;
|
||||
import org.apache.activemq.transport.TransportFilter;
|
||||
import org.apache.activemq.transport.TransportListener;
|
||||
import org.apache.activemq.transport.amqp.message.InboundTransformer;
|
||||
import org.apache.activemq.transport.tcp.SslTransport;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.qpid.proton.jms.InboundTransformer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
|
||||
|
||||
public AMQPNativeInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||
|
||||
Message rc = super.transform(amqpMessage);
|
||||
|
||||
populateMessage(rc, amqp);
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageFormatException;
|
||||
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
|
||||
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
|
||||
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
|
||||
public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
||||
|
||||
public AMQPNativeOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EncodedMessage transform(Message msg) throws Exception {
|
||||
if( msg == null )
|
||||
return null;
|
||||
if( !(msg instanceof BytesMessage) )
|
||||
return null;
|
||||
try {
|
||||
if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||
return null;
|
||||
}
|
||||
} catch (MessageFormatException e) {
|
||||
return null;
|
||||
}
|
||||
return transform(this, (BytesMessage) msg);
|
||||
}
|
||||
|
||||
static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
|
||||
long messageFormat;
|
||||
try {
|
||||
messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
|
||||
} catch (MessageFormatException e) {
|
||||
return null;
|
||||
}
|
||||
byte data[] = new byte[(int) msg.getBodyLength()];
|
||||
int dataSize = data.length;
|
||||
msg.readBytes(data);
|
||||
msg.reset();
|
||||
|
||||
try {
|
||||
int count = msg.getIntProperty("JMSXDeliveryCount");
|
||||
if( count > 1 ) {
|
||||
|
||||
// decode...
|
||||
ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
|
||||
int offset = 0;
|
||||
int len = data.length;
|
||||
while( len > 0 ) {
|
||||
final int decoded = amqp.decode(data, offset, len);
|
||||
assert decoded > 0: "Make progress decoding the message";
|
||||
offset += decoded;
|
||||
len -= decoded;
|
||||
}
|
||||
|
||||
// Update the DeliveryCount header...
|
||||
// The AMQP delivery-count field only includes prior failed delivery attempts,
|
||||
// whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
|
||||
amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
|
||||
|
||||
// Re-encode...
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
|
||||
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
|
||||
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
|
||||
if( overflow.position() > 0 ) {
|
||||
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
|
||||
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
|
||||
}
|
||||
data = buffer.array();
|
||||
dataSize = c;
|
||||
}
|
||||
} catch (JMSException e) {
|
||||
}
|
||||
|
||||
return new EncodedMessage(messageFormat, data, 0, dataSize);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Message;
|
||||
|
||||
public class AMQPRawInboundTransformer extends InboundTransformer {
|
||||
|
||||
public AMQPRawInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
BytesMessage rc = vendor.createBytesMessage();
|
||||
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
|
||||
|
||||
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
rc.setJMSPriority(defaultPriority);
|
||||
|
||||
final long now = System.currentTimeMillis();
|
||||
rc.setJMSTimestamp(now);
|
||||
if( defaultTtl > 0 ) {
|
||||
rc.setJMSExpiration(now + defaultTtl);
|
||||
}
|
||||
|
||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
||||
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Message;
|
||||
|
||||
public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
|
||||
|
||||
private final JMSMappingOutboundTransformer transformer;
|
||||
|
||||
public AutoOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
transformer = new JMSMappingOutboundTransformer(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EncodedMessage transform(Message msg) throws Exception {
|
||||
if( msg == null )
|
||||
return null;
|
||||
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||
if( msg instanceof BytesMessage ) {
|
||||
return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return transformer.transform(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations)
|
||||
{
|
||||
super.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations);
|
||||
transformer.setUseByteDestinationTypeAnnotations(useByteDestinationTypeAnnotations);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
|
||||
public class EncodedMessage {
|
||||
|
||||
private final Binary data;
|
||||
final long messageFormat;
|
||||
|
||||
public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
|
||||
this.data = new Binary(data, offset, length);
|
||||
this.messageFormat = messageFormat;
|
||||
}
|
||||
|
||||
public long getMessageFormat() {
|
||||
return messageFormat;
|
||||
}
|
||||
|
||||
public Message decode() throws Exception {
|
||||
Message amqp = Message.Factory.create();
|
||||
|
||||
int offset = getArrayOffset();
|
||||
int len = getLength();
|
||||
while (len > 0) {
|
||||
final int decoded = amqp.decode(getArray(), offset, len);
|
||||
assert decoded > 0 : "Make progress decoding the message";
|
||||
offset += decoded;
|
||||
len -= decoded;
|
||||
}
|
||||
|
||||
return amqp;
|
||||
}
|
||||
|
||||
public int getLength() {
|
||||
return data.getLength();
|
||||
}
|
||||
|
||||
public int getArrayOffset() {
|
||||
return data.getArrayOffset();
|
||||
}
|
||||
|
||||
public byte[] getArray() {
|
||||
return data.getArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return data.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,360 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Decimal128;
|
||||
import org.apache.qpid.proton.amqp.Decimal32;
|
||||
import org.apache.qpid.proton.amqp.Decimal64;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.UnsignedLong;
|
||||
import org.apache.qpid.proton.amqp.UnsignedShort;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Footer;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
|
||||
public abstract class InboundTransformer {
|
||||
|
||||
JMSVendor vendor;
|
||||
|
||||
public static final String TRANSFORMER_NATIVE = "native";
|
||||
public static final String TRANSFORMER_RAW = "raw";
|
||||
public static final String TRANSFORMER_JMS = "jms";
|
||||
|
||||
String prefixVendor = "JMS_AMQP_";
|
||||
String prefixDeliveryAnnotations = "DA_";
|
||||
String prefixMessageAnnotations = "MA_";
|
||||
String prefixFooter = "FT_";
|
||||
|
||||
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
|
||||
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
|
||||
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
|
||||
|
||||
private boolean useByteDestinationTypeAnnotations = false;
|
||||
|
||||
public InboundTransformer(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
|
||||
abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
|
||||
|
||||
public boolean isUseByteDestinationTypeAnnotations() {
|
||||
return useByteDestinationTypeAnnotations;
|
||||
}
|
||||
|
||||
public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations) {
|
||||
this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations;
|
||||
}
|
||||
|
||||
public int getDefaultDeliveryMode() {
|
||||
return defaultDeliveryMode;
|
||||
}
|
||||
|
||||
public void setDefaultDeliveryMode(int defaultDeliveryMode) {
|
||||
this.defaultDeliveryMode = defaultDeliveryMode;
|
||||
}
|
||||
|
||||
public int getDefaultPriority() {
|
||||
return defaultPriority;
|
||||
}
|
||||
|
||||
public void setDefaultPriority(int defaultPriority) {
|
||||
this.defaultPriority = defaultPriority;
|
||||
}
|
||||
|
||||
public long getDefaultTtl() {
|
||||
return defaultTtl;
|
||||
}
|
||||
|
||||
public void setDefaultTtl(long defaultTtl) {
|
||||
this.defaultTtl = defaultTtl;
|
||||
}
|
||||
|
||||
public String getPrefixVendor() {
|
||||
return prefixVendor;
|
||||
}
|
||||
|
||||
public void setPrefixVendor(String prefixVendor) {
|
||||
this.prefixVendor = prefixVendor;
|
||||
}
|
||||
|
||||
public JMSVendor getVendor() {
|
||||
return vendor;
|
||||
}
|
||||
|
||||
public void setVendor(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
|
||||
Header header = amqp.getHeader();
|
||||
if (header == null) {
|
||||
header = new Header();
|
||||
}
|
||||
|
||||
if (header.getDurable() != null) {
|
||||
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||
} else {
|
||||
jms.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
}
|
||||
if (header.getPriority() != null) {
|
||||
jms.setJMSPriority(header.getPriority().intValue());
|
||||
} else {
|
||||
jms.setJMSPriority(defaultPriority);
|
||||
}
|
||||
if (header.getFirstAcquirer() != null) {
|
||||
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
||||
}
|
||||
if (header.getDeliveryCount() != null) {
|
||||
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
|
||||
}
|
||||
|
||||
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
|
||||
if (da != null) {
|
||||
for (Map.Entry<?, ?> entry : da.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
Class<? extends Destination> toAttributes = null;
|
||||
Class<? extends Destination> replyToAttributes = null;
|
||||
|
||||
if (isUseByteDestinationTypeAnnotations()) {
|
||||
toAttributes = Queue.class;
|
||||
replyToAttributes = Queue.class;
|
||||
} else {
|
||||
toAttributes = Destination.class;
|
||||
replyToAttributes = Destination.class;
|
||||
}
|
||||
|
||||
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
if (ma != null) {
|
||||
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) {
|
||||
jms.setJMSType(entry.getValue().toString());
|
||||
} else if ("x-opt-to-type".equals(key.toString())) {
|
||||
toAttributes = toClassFromAttributes(entry.getValue());
|
||||
} else if ("x-opt-reply-type".equals(key.toString())) {
|
||||
replyToAttributes = toClassFromAttributes(entry.getValue());
|
||||
} else {
|
||||
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final ApplicationProperties ap = amqp.getApplicationProperties();
|
||||
if (ap != null) {
|
||||
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("JMSXGroupID".equals(key)) {
|
||||
vendor.setJMSXGroupID(jms, entry.getValue().toString());
|
||||
} else if ("JMSXGroupSequence".equals(key)) {
|
||||
vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue());
|
||||
} else if ("JMSXUserID".equals(key)) {
|
||||
vendor.setJMSXUserID(jms, entry.getValue().toString());
|
||||
} else {
|
||||
setProperty(jms, key, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Properties properties = amqp.getProperties();
|
||||
if (properties != null) {
|
||||
if (properties.getMessageId() != null) {
|
||||
jms.setJMSMessageID(properties.getMessageId().toString());
|
||||
}
|
||||
Binary userId = properties.getUserId();
|
||||
if (userId != null) {
|
||||
vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
|
||||
}
|
||||
if (properties.getTo() != null) {
|
||||
jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
|
||||
}
|
||||
if (properties.getReplyTo() != null) {
|
||||
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
|
||||
}
|
||||
if (properties.getCorrelationId() != null) {
|
||||
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
|
||||
}
|
||||
if (properties.getContentType() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
|
||||
}
|
||||
if (properties.getContentEncoding() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
|
||||
}
|
||||
if (properties.getCreationTime() != null) {
|
||||
jms.setJMSTimestamp(properties.getCreationTime().getTime());
|
||||
}
|
||||
if (properties.getGroupId() != null) {
|
||||
vendor.setJMSXGroupID(jms, properties.getGroupId());
|
||||
}
|
||||
if (properties.getGroupSequence() != null) {
|
||||
vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
|
||||
}
|
||||
if (properties.getReplyToGroupId() != null) {
|
||||
jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
|
||||
}
|
||||
if (properties.getAbsoluteExpiryTime() != null) {
|
||||
jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
|
||||
}
|
||||
}
|
||||
|
||||
// If the jms expiration has not yet been set...
|
||||
if (jms.getJMSExpiration() == 0) {
|
||||
// Then lets try to set it based on the message ttl.
|
||||
long ttl = defaultTtl;
|
||||
if (header.getTtl() != null) {
|
||||
ttl = header.getTtl().longValue();
|
||||
}
|
||||
if (ttl == 0) {
|
||||
jms.setJMSExpiration(0);
|
||||
} else {
|
||||
jms.setJMSExpiration(System.currentTimeMillis() + ttl);
|
||||
}
|
||||
}
|
||||
|
||||
final Footer fp = amqp.getFooter();
|
||||
if (fp != null) {
|
||||
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
|
||||
private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
|
||||
private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
|
||||
private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
|
||||
|
||||
private static Set<String> createSet(String... args) {
|
||||
HashSet<String> s = new HashSet<String>();
|
||||
for (String arg : args) {
|
||||
s.add(arg);
|
||||
}
|
||||
return Collections.unmodifiableSet(s);
|
||||
}
|
||||
|
||||
Class<? extends Destination> toClassFromAttributes(Object value) {
|
||||
if (isUseByteDestinationTypeAnnotations()) {
|
||||
if (value instanceof Byte) {
|
||||
switch ((Byte) value) {
|
||||
case JMSVendor.QUEUE_TYPE:
|
||||
return Queue.class;
|
||||
case JMSVendor.TOPIC_TYPE:
|
||||
return Topic.class;
|
||||
case JMSVendor.TEMP_QUEUE_TYPE:
|
||||
return TemporaryQueue.class;
|
||||
case JMSVendor.TEMP_TOPIC_TYPE:
|
||||
return TemporaryTopic.class;
|
||||
default:
|
||||
return Queue.class;
|
||||
}
|
||||
}
|
||||
|
||||
return Queue.class;
|
||||
} else {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
String valueString = value.toString();
|
||||
HashSet<String> attributes = new HashSet<String>();
|
||||
for (String x : valueString.split("\\s*,\\s*")) {
|
||||
attributes.add(x);
|
||||
}
|
||||
|
||||
if (QUEUE_ATTRIBUTES.equals(attributes)) {
|
||||
return Queue.class;
|
||||
}
|
||||
if (TOPIC_ATTRIBUTES.equals(attributes)) {
|
||||
return Topic.class;
|
||||
}
|
||||
if (TEMP_QUEUE_ATTRIBUTES.equals(attributes)) {
|
||||
return TemporaryQueue.class;
|
||||
}
|
||||
if (TEMP_TOPIC_ATTRIBUTES.equals(attributes)) {
|
||||
return TemporaryTopic.class;
|
||||
}
|
||||
return Destination.class;
|
||||
}
|
||||
}
|
||||
|
||||
private void setProperty(Message msg, String key, Object value) throws JMSException {
|
||||
if (value instanceof UnsignedLong) {
|
||||
long v = ((UnsignedLong) value).longValue();
|
||||
msg.setLongProperty(key, v);
|
||||
} else if (value instanceof UnsignedInteger) {
|
||||
long v = ((UnsignedInteger) value).longValue();
|
||||
if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
|
||||
msg.setIntProperty(key, (int) v);
|
||||
} else {
|
||||
msg.setLongProperty(key, v);
|
||||
}
|
||||
} else if (value instanceof UnsignedShort) {
|
||||
int v = ((UnsignedShort) value).intValue();
|
||||
if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
|
||||
msg.setShortProperty(key, (short) v);
|
||||
} else {
|
||||
msg.setIntProperty(key, v);
|
||||
}
|
||||
} else if (value instanceof UnsignedByte) {
|
||||
short v = ((UnsignedByte) value).shortValue();
|
||||
if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
|
||||
msg.setByteProperty(key, (byte) v);
|
||||
} else {
|
||||
msg.setShortProperty(key, v);
|
||||
}
|
||||
} else if (value instanceof Symbol) {
|
||||
msg.setStringProperty(key, value.toString());
|
||||
} else if (value instanceof Decimal128) {
|
||||
msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
|
||||
} else if (value instanceof Decimal64) {
|
||||
msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
|
||||
} else if (value instanceof Decimal32) {
|
||||
msg.setFloatProperty(key, ((Decimal32) value).floatValue());
|
||||
} else if (value instanceof Binary) {
|
||||
msg.setStringProperty(key, value.toString());
|
||||
} else {
|
||||
msg.setObjectProperty(key, value);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.Section;
|
||||
|
||||
public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||
|
||||
public JMSMappingInboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked" })
|
||||
@Override
|
||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||
|
||||
Message rc;
|
||||
final Section body = amqp.getBody();
|
||||
if (body == null) {
|
||||
rc = vendor.createMessage();
|
||||
} else if (body instanceof Data) {
|
||||
Binary d = ((Data) body).getValue();
|
||||
BytesMessage m = vendor.createBytesMessage();
|
||||
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
|
||||
rc = m;
|
||||
} else if (body instanceof AmqpSequence) {
|
||||
AmqpSequence sequence = (AmqpSequence) body;
|
||||
StreamMessage m = vendor.createStreamMessage();
|
||||
for (Object item : sequence.getValue()) {
|
||||
m.writeObject(item);
|
||||
}
|
||||
rc = m;
|
||||
} else if (body instanceof AmqpValue) {
|
||||
Object value = ((AmqpValue) body).getValue();
|
||||
if (value == null) {
|
||||
rc = vendor.createObjectMessage();
|
||||
}
|
||||
if (value instanceof String) {
|
||||
TextMessage m = vendor.createTextMessage();
|
||||
m.setText((String) value);
|
||||
rc = m;
|
||||
} else if (value instanceof Binary) {
|
||||
Binary d = (Binary) value;
|
||||
BytesMessage m = vendor.createBytesMessage();
|
||||
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
|
||||
rc = m;
|
||||
} else if (value instanceof List) {
|
||||
StreamMessage m = vendor.createStreamMessage();
|
||||
for (Object item : (List<Object>) value) {
|
||||
m.writeObject(item);
|
||||
}
|
||||
rc = m;
|
||||
} else if (value instanceof Map) {
|
||||
MapMessage m = vendor.createMapMessage();
|
||||
final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet();
|
||||
for (Map.Entry<String, Object> entry : set) {
|
||||
m.setObject(entry.getKey(), entry.getValue());
|
||||
}
|
||||
rc = m;
|
||||
} else {
|
||||
ObjectMessage m = vendor.createObjectMessage();
|
||||
m.setObject((Serializable) value);
|
||||
rc = m;
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("Unexpected body type: " + body.getClass());
|
||||
}
|
||||
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
||||
rc.setJMSPriority(defaultPriority);
|
||||
rc.setJMSExpiration(defaultTtl);
|
||||
|
||||
populateMessage(rc, amqp);
|
||||
|
||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
||||
return rc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,314 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageEOFException;
|
||||
import javax.jms.MessageFormatException;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Binary;
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.UnsignedByte;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Data;
|
||||
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Footer;
|
||||
import org.apache.qpid.proton.amqp.messaging.Header;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.amqp.messaging.Properties;
|
||||
import org.apache.qpid.proton.amqp.messaging.Section;
|
||||
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
|
||||
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
|
||||
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||
import org.apache.qpid.proton.message.ProtonJMessage;
|
||||
|
||||
public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||
|
||||
public JMSMappingOutboundTransformer(JMSVendor vendor) {
|
||||
super(vendor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EncodedMessage transform(Message msg) throws Exception {
|
||||
if (msg == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
if (msg.getBooleanProperty(prefixVendor + "NATIVE")) {
|
||||
return null;
|
||||
}
|
||||
} catch (MessageFormatException e) {
|
||||
return null;
|
||||
}
|
||||
ProtonJMessage amqp = convert(msg);
|
||||
|
||||
long messageFormat;
|
||||
try {
|
||||
messageFormat = msg.getLongProperty(this.messageFormatKey);
|
||||
} catch (MessageFormatException e) {
|
||||
return null;
|
||||
}
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
|
||||
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
|
||||
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
|
||||
if (overflow.position() > 0) {
|
||||
buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
|
||||
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
|
||||
}
|
||||
|
||||
return new EncodedMessage(messageFormat, buffer.array(), 0, c);
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the conversion between JMS Message and Proton Message without
|
||||
* re-encoding it to array. This is needed because some frameworks may elect
|
||||
* to do this on their own way (Netty for instance using Nettybuffers)
|
||||
*
|
||||
* @param msg
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
public ProtonJMessage convert(Message msg) throws JMSException, UnsupportedEncodingException {
|
||||
Header header = new Header();
|
||||
Properties props = new Properties();
|
||||
HashMap<Symbol, Object> daMap = null;
|
||||
HashMap<Symbol, Object> maMap = null;
|
||||
HashMap apMap = null;
|
||||
Section body = null;
|
||||
HashMap footerMap = null;
|
||||
if (msg instanceof BytesMessage) {
|
||||
BytesMessage m = (BytesMessage) msg;
|
||||
byte data[] = new byte[(int) m.getBodyLength()];
|
||||
m.readBytes(data);
|
||||
m.reset(); // Need to reset after readBytes or future readBytes
|
||||
// calls (ex: redeliveries) will fail and return -1
|
||||
body = new Data(new Binary(data));
|
||||
}
|
||||
if (msg instanceof TextMessage) {
|
||||
body = new AmqpValue(((TextMessage) msg).getText());
|
||||
}
|
||||
if (msg instanceof MapMessage) {
|
||||
final HashMap<String, Object> map = new HashMap<String, Object>();
|
||||
final MapMessage m = (MapMessage) msg;
|
||||
final Enumeration<String> names = m.getMapNames();
|
||||
while (names.hasMoreElements()) {
|
||||
String key = names.nextElement();
|
||||
map.put(key, m.getObject(key));
|
||||
}
|
||||
body = new AmqpValue(map);
|
||||
}
|
||||
if (msg instanceof StreamMessage) {
|
||||
ArrayList<Object> list = new ArrayList<Object>();
|
||||
final StreamMessage m = (StreamMessage) msg;
|
||||
try {
|
||||
while (true) {
|
||||
list.add(m.readObject());
|
||||
}
|
||||
} catch (MessageEOFException e) {
|
||||
}
|
||||
body = new AmqpSequence(list);
|
||||
}
|
||||
if (msg instanceof ObjectMessage) {
|
||||
body = new AmqpValue(((ObjectMessage) msg).getObject());
|
||||
}
|
||||
|
||||
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
|
||||
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
|
||||
if (msg.getJMSType() != null) {
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
|
||||
}
|
||||
if (msg.getJMSMessageID() != null) {
|
||||
props.setMessageId(msg.getJMSMessageID());
|
||||
}
|
||||
if (msg.getJMSDestination() != null) {
|
||||
props.setTo(vendor.toAddress(msg.getJMSDestination()));
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(Symbol.valueOf("x-opt-to-type"), destinationAttributes(msg.getJMSDestination()));
|
||||
}
|
||||
if (msg.getJMSReplyTo() != null) {
|
||||
props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(Symbol.valueOf("x-opt-reply-type"), destinationAttributes(msg.getJMSReplyTo()));
|
||||
}
|
||||
if (msg.getJMSCorrelationID() != null) {
|
||||
props.setCorrelationId(msg.getJMSCorrelationID());
|
||||
}
|
||||
if (msg.getJMSExpiration() != 0) {
|
||||
long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
|
||||
if (ttl < 0) {
|
||||
ttl = 1;
|
||||
}
|
||||
header.setTtl(new UnsignedInteger((int) ttl));
|
||||
|
||||
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
|
||||
}
|
||||
if (msg.getJMSTimestamp() != 0) {
|
||||
props.setCreationTime(new Date(msg.getJMSTimestamp()));
|
||||
}
|
||||
|
||||
final Enumeration<String> keys = msg.getPropertyNames();
|
||||
while (keys.hasMoreElements()) {
|
||||
String key = keys.nextElement();
|
||||
if (key.equals(messageFormatKey) || key.equals(nativeKey)) {
|
||||
// skip..
|
||||
} else if (key.equals(firstAcquirerKey)) {
|
||||
header.setFirstAcquirer(msg.getBooleanProperty(key));
|
||||
} else if (key.startsWith("JMSXDeliveryCount")) {
|
||||
// The AMQP delivery-count field only includes prior failed delivery attempts,
|
||||
// whereas JMSXDeliveryCount includes the first/current delivery attempt.
|
||||
int amqpDeliveryCount = msg.getIntProperty(key) - 1;
|
||||
if (amqpDeliveryCount > 0) {
|
||||
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
|
||||
}
|
||||
} else if (key.startsWith("JMSXUserID")) {
|
||||
String value = msg.getStringProperty(key);
|
||||
props.setUserId(new Binary(value.getBytes("UTF-8")));
|
||||
} else if (key.startsWith("JMSXGroupID")) {
|
||||
String value = msg.getStringProperty(key);
|
||||
props.setGroupId(value);
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, value);
|
||||
} else if (key.startsWith("JMSXGroupSeq")) {
|
||||
UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
|
||||
props.setGroupSequence(value);
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, value);
|
||||
} else if (key.startsWith(prefixDeliveryAnnotationsKey)) {
|
||||
if (daMap == null) {
|
||||
daMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
String name = key.substring(prefixDeliveryAnnotationsKey.length());
|
||||
daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
|
||||
} else if (key.startsWith(prefixMessageAnnotationsKey)) {
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
String name = key.substring(prefixMessageAnnotationsKey.length());
|
||||
maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
|
||||
} else if (key.equals(subjectKey)) {
|
||||
props.setSubject(msg.getStringProperty(key));
|
||||
} else if (key.equals(contentTypeKey)) {
|
||||
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||
} else if (key.equals(contentEncodingKey)) {
|
||||
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||
} else if (key.equals(replyToGroupIDKey)) {
|
||||
props.setReplyToGroupId(msg.getStringProperty(key));
|
||||
} else if (key.startsWith(prefixFooterKey)) {
|
||||
if (footerMap == null) {
|
||||
footerMap = new HashMap();
|
||||
}
|
||||
String name = key.substring(prefixFooterKey.length());
|
||||
footerMap.put(name, msg.getObjectProperty(key));
|
||||
} else {
|
||||
if (apMap == null) {
|
||||
apMap = new HashMap();
|
||||
}
|
||||
apMap.put(key, msg.getObjectProperty(key));
|
||||
}
|
||||
}
|
||||
|
||||
MessageAnnotations ma = null;
|
||||
if (maMap != null) {
|
||||
ma = new MessageAnnotations(maMap);
|
||||
}
|
||||
DeliveryAnnotations da = null;
|
||||
if (daMap != null) {
|
||||
da = new DeliveryAnnotations(daMap);
|
||||
}
|
||||
ApplicationProperties ap = null;
|
||||
if (apMap != null) {
|
||||
ap = new ApplicationProperties(apMap);
|
||||
}
|
||||
Footer footer = null;
|
||||
if (footerMap != null) {
|
||||
footer = new Footer(footerMap);
|
||||
}
|
||||
|
||||
return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
|
||||
}
|
||||
|
||||
private Object destinationAttributes(Destination destination) {
|
||||
if (isUseByteDestinationTypeAnnotations()) {
|
||||
if (destination instanceof Queue) {
|
||||
if (destination instanceof TemporaryQueue) {
|
||||
return JMSVendor.TEMP_QUEUE_TYPE;
|
||||
} else {
|
||||
return JMSVendor.QUEUE_TYPE;
|
||||
}
|
||||
}
|
||||
if (destination instanceof Topic) {
|
||||
if (destination instanceof TemporaryTopic) {
|
||||
return JMSVendor.TEMP_TOPIC_TYPE;
|
||||
} else {
|
||||
return JMSVendor.TOPIC_TYPE;
|
||||
}
|
||||
}
|
||||
return JMSVendor.QUEUE_TYPE;
|
||||
} else {
|
||||
if (destination instanceof Queue) {
|
||||
if (destination instanceof TemporaryQueue) {
|
||||
return "temporary,queue";
|
||||
} else {
|
||||
return "queue";
|
||||
}
|
||||
}
|
||||
if (destination instanceof Topic) {
|
||||
if (destination instanceof TemporaryTopic) {
|
||||
return "temporary,topic";
|
||||
} else {
|
||||
return "topic";
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.BytesMessage;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.StreamMessage;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
abstract public class JMSVendor {
|
||||
|
||||
public static final byte QUEUE_TYPE = 0x00;
|
||||
public static final byte TOPIC_TYPE = 0x01;
|
||||
public static final byte TEMP_QUEUE_TYPE = 0x02;
|
||||
public static final byte TEMP_TOPIC_TYPE = 0x03;
|
||||
|
||||
public abstract BytesMessage createBytesMessage();
|
||||
|
||||
public abstract StreamMessage createStreamMessage();
|
||||
|
||||
public abstract Message createMessage();
|
||||
|
||||
public abstract TextMessage createTextMessage();
|
||||
|
||||
public abstract ObjectMessage createObjectMessage();
|
||||
|
||||
public abstract MapMessage createMapMessage();
|
||||
|
||||
public abstract void setJMSXUserID(Message message, String value);
|
||||
|
||||
@Deprecated
|
||||
public Destination createDestination(String name) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public <T extends Destination> T createDestination(String name, Class<T> kind) {
|
||||
return kind.cast(createDestination(name));
|
||||
}
|
||||
|
||||
public abstract void setJMSXGroupID(Message message, String groupId);
|
||||
|
||||
public abstract void setJMSXGroupSequence(Message message, int value);
|
||||
|
||||
public abstract void setJMSXDeliveryCount(Message message, long value);
|
||||
|
||||
public abstract String toAddress(Destination destination);
|
||||
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import javax.jms.Message;
|
||||
|
||||
public abstract class OutboundTransformer {
|
||||
|
||||
JMSVendor vendor;
|
||||
String prefixVendor;
|
||||
|
||||
String prefixDeliveryAnnotations = "DA_";
|
||||
String prefixMessageAnnotations= "MA_";
|
||||
String prefixFooter = "FT_";
|
||||
|
||||
String messageFormatKey;
|
||||
String nativeKey;
|
||||
String firstAcquirerKey;
|
||||
String prefixDeliveryAnnotationsKey;
|
||||
String prefixMessageAnnotationsKey;
|
||||
String subjectKey;
|
||||
String contentTypeKey;
|
||||
String contentEncodingKey;
|
||||
String replyToGroupIDKey;
|
||||
String prefixFooterKey;
|
||||
|
||||
private boolean useByteDestinationTypeAnnotations;
|
||||
|
||||
public OutboundTransformer(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
this.setPrefixVendor("JMS_AMQP_");
|
||||
}
|
||||
|
||||
public abstract EncodedMessage transform(Message jms) throws Exception;
|
||||
|
||||
public boolean isUseByteDestinationTypeAnnotations()
|
||||
{
|
||||
return useByteDestinationTypeAnnotations;
|
||||
}
|
||||
|
||||
public void setUseByteDestinationTypeAnnotations(boolean useByteDestinationTypeAnnotations)
|
||||
{
|
||||
this.useByteDestinationTypeAnnotations = useByteDestinationTypeAnnotations;
|
||||
}
|
||||
|
||||
public String getPrefixVendor() {
|
||||
return prefixVendor;
|
||||
}
|
||||
|
||||
public void setPrefixVendor(String prefixVendor) {
|
||||
this.prefixVendor = prefixVendor;
|
||||
|
||||
messageFormatKey = prefixVendor + "MESSAGE_FORMAT";
|
||||
nativeKey = prefixVendor + "NATIVE";
|
||||
firstAcquirerKey = prefixVendor + "FirstAcquirer";
|
||||
prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
|
||||
prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
|
||||
subjectKey = prefixVendor +"Subject";
|
||||
contentTypeKey = prefixVendor +"ContentType";
|
||||
contentEncodingKey = prefixVendor +"ContentEncoding";
|
||||
replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
|
||||
prefixFooterKey = prefixVendor + prefixFooter;
|
||||
|
||||
}
|
||||
|
||||
public JMSVendor getVendor() {
|
||||
return vendor;
|
||||
}
|
||||
|
||||
public void setVendor(JMSVendor vendor) {
|
||||
this.vendor = vendor;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,259 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class JMSMappingInboundTransformerTest {
|
||||
|
||||
@Test
|
||||
public void testTransformMessageWithAmqpValueStringCreatesTextMessage() throws Exception {
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
JMSVendor mockVendor = createMockVendor(mockTextMessage);
|
||||
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
|
||||
|
||||
String contentString = "myTextMessageContent";
|
||||
Message amqp = Message.Factory.create();
|
||||
amqp.setBody(new AmqpValue(contentString));
|
||||
|
||||
EncodedMessage em = encodeMessage(amqp);
|
||||
|
||||
javax.jms.Message jmsMessage = transformer.transform(em);
|
||||
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
Mockito.verify(mockTextMessage).setText(contentString);
|
||||
assertSame("Expected provided mock message, got a different one", mockTextMessage, jmsMessage);
|
||||
}
|
||||
|
||||
// ======= JMSDestination Handling =========
|
||||
|
||||
// --- String type annotation ---
|
||||
@Test
|
||||
public void testTransformWithNoToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicStringToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false);
|
||||
}
|
||||
|
||||
// --- byte type annotation ---
|
||||
|
||||
@Test
|
||||
public void testTransformWithNoToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueByteToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueByteToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicByteToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicByteToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true);
|
||||
}
|
||||
|
||||
private void doTransformWithToTypeDestinationTypeAnnotationTestImpl(Object toTypeAnnotationValue, Class<? extends Destination> expectedClass,
|
||||
boolean byteType) throws Exception {
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
JMSVendor mockVendor = createMockVendor(mockTextMessage);
|
||||
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
|
||||
if (byteType) {
|
||||
transformer.setUseByteDestinationTypeAnnotations(true);
|
||||
}
|
||||
|
||||
String toAddress = "toAddress";
|
||||
Message amqp = Message.Factory.create();
|
||||
amqp.setBody(new AmqpValue("myTextMessageContent"));
|
||||
amqp.setAddress(toAddress);
|
||||
if (toTypeAnnotationValue != null) {
|
||||
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
|
||||
map.put(Symbol.valueOf("x-opt-to-type"), toTypeAnnotationValue);
|
||||
MessageAnnotations ma = new MessageAnnotations(map);
|
||||
amqp.setMessageAnnotations(ma);
|
||||
}
|
||||
|
||||
EncodedMessage em = encodeMessage(amqp);
|
||||
|
||||
javax.jms.Message jmsMessage = transformer.transform(em);
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
|
||||
// Verify that createDestination was called with the provided 'to'
|
||||
// address and 'Destination' class
|
||||
Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
|
||||
}
|
||||
|
||||
// ======= JMSReplyTo Handling =========
|
||||
|
||||
// --- String type annotation ---
|
||||
@Test
|
||||
public void testTransformWithNoReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Destination.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue", Queue.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("queue,temporary", TemporaryQueue.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic", Topic.class, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicStringReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl("topic,temporary", TemporaryTopic.class, false);
|
||||
}
|
||||
|
||||
// --- byte type annotation ---
|
||||
@Test
|
||||
public void testTransformWithNoReplyToTypeDestinationTypeAnnotationUsingByteAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(null, Queue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.QUEUE_TYPE, Queue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryQueueByteReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_QUEUE_TYPE, TemporaryQueue.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TOPIC_TYPE, Topic.class, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransformWithTemporaryTopicByteReplyToTypeDestinationTypeAnnotation() throws Exception {
|
||||
doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(JMSVendor.TEMP_TOPIC_TYPE, TemporaryTopic.class, true);
|
||||
}
|
||||
|
||||
private void doTransformWithReplyToTypeDestinationTypeAnnotationTestImpl(Object replyToTypeAnnotationValue, Class<? extends Destination> expectedClass,
|
||||
boolean byteType) throws Exception {
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
JMSVendor mockVendor = createMockVendor(mockTextMessage);
|
||||
JMSMappingInboundTransformer transformer = new JMSMappingInboundTransformer(mockVendor);
|
||||
if (byteType) {
|
||||
transformer.setUseByteDestinationTypeAnnotations(true);
|
||||
}
|
||||
|
||||
String replyToAddress = "replyToAddress";
|
||||
Message amqp = Message.Factory.create();
|
||||
amqp.setBody(new AmqpValue("myTextMessageContent"));
|
||||
amqp.setReplyTo(replyToAddress);
|
||||
if (replyToTypeAnnotationValue != null) {
|
||||
Map<Symbol, Object> map = new HashMap<Symbol, Object>();
|
||||
map.put(Symbol.valueOf("x-opt-reply-type"), replyToTypeAnnotationValue);
|
||||
MessageAnnotations ma = new MessageAnnotations(map);
|
||||
amqp.setMessageAnnotations(ma);
|
||||
}
|
||||
|
||||
EncodedMessage em = encodeMessage(amqp);
|
||||
|
||||
javax.jms.Message jmsMessage = transformer.transform(em);
|
||||
assertTrue("Expected TextMessage", jmsMessage instanceof TextMessage);
|
||||
|
||||
// Verify that createDestination was called with the provided 'replyTo'
|
||||
// address and 'Destination' class
|
||||
Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
|
||||
}
|
||||
|
||||
// ======= Utility Methods =========
|
||||
|
||||
private TextMessage createMockTextMessage() {
|
||||
TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
|
||||
|
||||
return mockTextMessage;
|
||||
}
|
||||
|
||||
private JMSVendor createMockVendor(TextMessage mockTextMessage) {
|
||||
JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
|
||||
Mockito.when(mockVendor.createTextMessage()).thenReturn(mockTextMessage);
|
||||
|
||||
return mockVendor;
|
||||
}
|
||||
|
||||
private EncodedMessage encodeMessage(Message message) {
|
||||
byte[] encodeBuffer = new byte[1024 * 8];
|
||||
int encodedSize;
|
||||
while (true) {
|
||||
try {
|
||||
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
|
||||
break;
|
||||
} catch (java.nio.BufferOverflowException e) {
|
||||
encodeBuffer = new byte[encodeBuffer.length * 2];
|
||||
}
|
||||
}
|
||||
|
||||
long messageFormat = 0;
|
||||
return new EncodedMessage(messageFormat, encodeBuffer, 0, encodedSize);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,309 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.transport.amqp.message;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.TemporaryQueue;
|
||||
import javax.jms.TemporaryTopic;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.qpid.proton.amqp.Symbol;
|
||||
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
|
||||
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
|
||||
import org.apache.qpid.proton.message.Message;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class JMSMappingOutboundTransformerTest {
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithTextMessageCreatesAmqpValueStringBody() throws Exception {
|
||||
String contentString = "myTextMessageContent";
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
Mockito.when(mockTextMessage.getText()).thenReturn(contentString);
|
||||
JMSVendor mockVendor = createMockVendor();
|
||||
|
||||
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
|
||||
|
||||
Message amqp = transformer.convert(mockTextMessage);
|
||||
|
||||
assertNotNull(amqp.getBody());
|
||||
assertTrue(amqp.getBody() instanceof AmqpValue);
|
||||
assertEquals(contentString, ((AmqpValue) amqp.getBody()).getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultsTolStringDestinationTypeAnnotationValues() {
|
||||
JMSVendor mockVendor = createMockVendor();
|
||||
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
|
||||
|
||||
assertFalse("Expected the older string style annotation values to be used by default", transformer.isUseByteDestinationTypeAnnotations());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetGetIsUseByteDestinationTypeAnnotations() {
|
||||
JMSVendor mockVendor = createMockVendor();
|
||||
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
|
||||
|
||||
assertFalse(transformer.isUseByteDestinationTypeAnnotations());
|
||||
transformer.setUseByteDestinationTypeAnnotations(true);
|
||||
assertTrue(transformer.isUseByteDestinationTypeAnnotations());
|
||||
}
|
||||
|
||||
// ======= JMSDestination Handling =========
|
||||
|
||||
// --- String type annotation ---
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationNull() throws Exception {
|
||||
doTestConvertMessageWithJMSDestination(null, null, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationQueue() throws Exception {
|
||||
Queue mockDest = Mockito.mock(Queue.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, "queue", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
|
||||
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, "temporary,queue", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTopic() throws Exception {
|
||||
Topic mockDest = Mockito.mock(Topic.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, "topic", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
|
||||
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, "temporary,topic", false);
|
||||
}
|
||||
|
||||
// --- byte type annotation ---
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationNullUsingByteAnnotation() throws Exception {
|
||||
doTestConvertMessageWithJMSDestination(null, null, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationQueueUsingByteAnnotation() throws Exception {
|
||||
Queue mockDest = Mockito.mock(Queue.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTemporaryQueueUsingByteAnnotation() throws Exception {
|
||||
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTopicUsingByteAnnotation() throws Exception {
|
||||
Topic mockDest = Mockito.mock(Topic.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TOPIC_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationTemporaryTopicUsingByteAnnotation() throws Exception {
|
||||
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSDestinationUnkownUsingByteAnnotation() throws Exception {
|
||||
Destination mockDest = Mockito.mock(Destination.class);
|
||||
|
||||
doTestConvertMessageWithJMSDestination(mockDest, JMSVendor.QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue, boolean byteType) throws Exception {
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
|
||||
Mockito.when(mockTextMessage.getJMSDestination()).thenReturn(jmsDestination);
|
||||
JMSVendor mockVendor = createMockVendor();
|
||||
String toAddress = "someToAddress";
|
||||
if (jmsDestination != null) {
|
||||
Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(toAddress);
|
||||
}
|
||||
|
||||
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
|
||||
if (byteType) {
|
||||
transformer.setUseByteDestinationTypeAnnotations(true);
|
||||
}
|
||||
|
||||
Message amqp = transformer.convert(mockTextMessage);
|
||||
|
||||
MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
|
||||
if (maMap != null) {
|
||||
Object actualValue = maMap.get(Symbol.valueOf("x-opt-to-type"));
|
||||
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
|
||||
} else if (expectedAnnotationValue != null) {
|
||||
fail("Expected annotation value, but there were no annotations");
|
||||
}
|
||||
|
||||
if (jmsDestination != null) {
|
||||
assertEquals("Unexpected 'to' address", toAddress, amqp.getAddress());
|
||||
}
|
||||
}
|
||||
|
||||
// ======= JMSReplyTo Handling =========
|
||||
|
||||
// --- String type annotation ---
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToNull() throws Exception {
|
||||
doTestConvertMessageWithJMSReplyTo(null, null, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToQueue() throws Exception {
|
||||
Queue mockDest = Mockito.mock(Queue.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, "queue", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
|
||||
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,queue", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTopic() throws Exception {
|
||||
Topic mockDest = Mockito.mock(Topic.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, "topic", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
|
||||
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,topic", false);
|
||||
}
|
||||
|
||||
// --- byte type annotation ---
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToNullUsingByteAnnotation() throws Exception {
|
||||
doTestConvertMessageWithJMSReplyTo(null, null, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToQueueUsingByteAnnotation() throws Exception {
|
||||
Queue mockDest = Mockito.mock(Queue.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTemporaryQueueUsingByteAnnotation() throws Exception {
|
||||
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTopicUsingByteAnnotation() throws Exception {
|
||||
Topic mockDest = Mockito.mock(Topic.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TOPIC_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToTemporaryTopicUsingByteAnnotation() throws Exception {
|
||||
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.TEMP_TOPIC_TYPE, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertMessageWithJMSReplyToUnkownUsingByteAnnotation() throws Exception {
|
||||
Destination mockDest = Mockito.mock(Destination.class);
|
||||
|
||||
doTestConvertMessageWithJMSReplyTo(mockDest, JMSVendor.QUEUE_TYPE, true);
|
||||
}
|
||||
|
||||
private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue, boolean byteType) throws Exception {
|
||||
TextMessage mockTextMessage = createMockTextMessage();
|
||||
Mockito.when(mockTextMessage.getText()).thenReturn("myTextMessageContent");
|
||||
Mockito.when(mockTextMessage.getJMSReplyTo()).thenReturn(jmsReplyTo);
|
||||
JMSVendor mockVendor = createMockVendor();
|
||||
String replyToAddress = "someReplyToAddress";
|
||||
if (jmsReplyTo != null) {
|
||||
Mockito.when(mockVendor.toAddress(Mockito.any(Destination.class))).thenReturn(replyToAddress);
|
||||
}
|
||||
|
||||
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer(mockVendor);
|
||||
if (byteType) {
|
||||
transformer.setUseByteDestinationTypeAnnotations(true);
|
||||
}
|
||||
|
||||
Message amqp = transformer.convert(mockTextMessage);
|
||||
|
||||
MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
|
||||
if (maMap != null) {
|
||||
Object actualValue = maMap.get(Symbol.valueOf("x-opt-reply-type"));
|
||||
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
|
||||
} else if (expectedAnnotationValue != null) {
|
||||
fail("Expected annotation value, but there were no annotations");
|
||||
}
|
||||
|
||||
if (jmsReplyTo != null) {
|
||||
assertEquals("Unexpected 'reply-to' address", replyToAddress, amqp.getReplyTo());
|
||||
}
|
||||
}
|
||||
|
||||
// ======= Utility Methods =========
|
||||
|
||||
private TextMessage createMockTextMessage() throws Exception {
|
||||
TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
|
||||
Mockito.when(mockTextMessage.getPropertyNames()).thenReturn(Collections.enumeration(Collections.emptySet()));
|
||||
|
||||
return mockTextMessage;
|
||||
}
|
||||
|
||||
private JMSVendor createMockVendor() {
|
||||
JMSVendor mockVendor = Mockito.mock(JMSVendor.class);
|
||||
|
||||
return mockVendor;
|
||||
}
|
||||
}
|
|
@ -67,11 +67,9 @@
|
|||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>${mockito-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
|
7
pom.xml
7
pom.xml
|
@ -637,7 +637,6 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- for custom XML parsing -->
|
||||
<dependency>
|
||||
<groupId>org.apache.xbean</groupId>
|
||||
|
@ -973,6 +972,12 @@
|
|||
<version>${junit-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>${mockito-version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jmock</groupId>
|
||||
<artifactId>jmock-junit4</artifactId>
|
||||
|
|
Loading…
Reference in New Issue