mirror of https://github.com/apache/activemq.git
amqp - support configurable transformers and populate message properties for the default native one
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1402148 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f81b52e28a
commit
b8a6e5a6ca
|
@ -363,8 +363,24 @@ class AmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
InboundTransformer inboundTransformer;
|
||||||
InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
|
||||||
|
protected InboundTransformer getInboundTransformer() {
|
||||||
|
if (inboundTransformer == null) {
|
||||||
|
String transformer = amqpTransport.getTransformer();
|
||||||
|
if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) {
|
||||||
|
inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
} else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) {
|
||||||
|
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
} else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) {
|
||||||
|
inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
} else {
|
||||||
|
LOG.warn("Unknown transformer type " + transformer + ", using native one instead");
|
||||||
|
inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return inboundTransformer;
|
||||||
|
}
|
||||||
|
|
||||||
abstract class BaseProducerContext extends AmqpDeliveryListener {
|
abstract class BaseProducerContext extends AmqpDeliveryListener {
|
||||||
|
|
||||||
|
@ -419,7 +435,7 @@ class AmqpProtocolConverter {
|
||||||
@Override
|
@Override
|
||||||
protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
|
protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
|
||||||
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
||||||
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
|
final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
|
||||||
current = null;
|
current = null;
|
||||||
|
|
||||||
if( message.getDestination()==null ) {
|
if( message.getDestination()==null ) {
|
||||||
|
@ -587,7 +603,7 @@ class AmqpProtocolConverter {
|
||||||
|
|
||||||
private Source createSource(ActiveMQDestination dest) {
|
private Source createSource(ActiveMQDestination dest) {
|
||||||
org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
|
org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
|
||||||
rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
|
rc.setAddress(getInboundTransformer().getVendor().toAddress(dest));
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,4 +40,6 @@ public interface AmqpTransport {
|
||||||
public AmqpWireFormat getWireFormat();
|
public AmqpWireFormat getWireFormat();
|
||||||
|
|
||||||
public void stop() throws Exception;
|
public void stop() throws Exception;
|
||||||
|
|
||||||
|
public String getTransformer();
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
import org.apache.activemq.transport.TransportFilter;
|
import org.apache.activemq.transport.TransportFilter;
|
||||||
import org.apache.activemq.transport.TransportListener;
|
import org.apache.activemq.transport.TransportListener;
|
||||||
|
import org.apache.activemq.transport.amqp.transform.InboundTransformer;
|
||||||
import org.apache.activemq.transport.tcp.SslTransport;
|
import org.apache.activemq.transport.tcp.SslTransport;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.wireformat.WireFormat;
|
import org.apache.activemq.wireformat.WireFormat;
|
||||||
|
@ -46,6 +47,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
|
||||||
private AmqpWireFormat wireFormat;
|
private AmqpWireFormat wireFormat;
|
||||||
|
|
||||||
private boolean trace;
|
private boolean trace;
|
||||||
|
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
|
||||||
|
|
||||||
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
|
public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
|
||||||
super(next);
|
super(next);
|
||||||
|
@ -161,4 +163,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public String getTransformer() {
|
||||||
|
return transformer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTransformer(String transformer) {
|
||||||
|
this.transformer = transformer;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import javax.jms.Message;
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
*/
|
*/
|
||||||
public class AMQPNativeInboundTransformer extends InboundTransformer {
|
public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
|
||||||
|
|
||||||
|
|
||||||
public AMQPNativeInboundTransformer(JMSVendor vendor) {
|
public AMQPNativeInboundTransformer(JMSVendor vendor) {
|
||||||
|
@ -31,21 +31,11 @@ public class AMQPNativeInboundTransformer extends InboundTransformer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||||
|
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||||
|
|
||||||
BytesMessage rc = vendor.createBytesMessage();
|
Message rc = super.transform(amqpMessage);
|
||||||
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
|
|
||||||
|
|
||||||
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
populateMessage(rc, amqp);
|
||||||
rc.setJMSPriority(defaultPriority);
|
|
||||||
|
|
||||||
final long now = System.currentTimeMillis();
|
|
||||||
rc.setJMSTimestamp(now);
|
|
||||||
if( defaultTtl > 0 ) {
|
|
||||||
rc.setJMSExpiration(now + defaultTtl);
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
|
||||||
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Message;
|
||||||
|
|
||||||
|
public class AMQPRawInboundTransformer extends InboundTransformer {
|
||||||
|
|
||||||
|
public AMQPRawInboundTransformer(JMSVendor vendor) {
|
||||||
|
super(vendor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||||
|
BytesMessage rc = vendor.createBytesMessage();
|
||||||
|
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
|
||||||
|
|
||||||
|
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
||||||
|
rc.setJMSPriority(defaultPriority);
|
||||||
|
|
||||||
|
final long now = System.currentTimeMillis();
|
||||||
|
rc.setJMSTimestamp(now);
|
||||||
|
if( defaultTtl > 0 ) {
|
||||||
|
rc.setJMSExpiration(now + defaultTtl);
|
||||||
|
}
|
||||||
|
|
||||||
|
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||||
|
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.amqp.transform;
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.message.Message;
|
||||||
import org.apache.qpid.proton.type.Binary;
|
import org.apache.qpid.proton.type.Binary;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,4 +34,19 @@ public class EncodedMessage extends Binary {
|
||||||
public long getMessageFormat() {
|
public long getMessageFormat() {
|
||||||
return messageFormat;
|
return messageFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Message decode() throws Exception {
|
||||||
|
Message amqp = new Message();
|
||||||
|
|
||||||
|
int offset = getArrayOffset();
|
||||||
|
int len = getLength();
|
||||||
|
while( len > 0 ) {
|
||||||
|
final int decoded = amqp.decode(getArray(), offset, len);
|
||||||
|
assert decoded > 0: "Make progress decoding the message";
|
||||||
|
offset += decoded;
|
||||||
|
len -= decoded;
|
||||||
|
}
|
||||||
|
|
||||||
|
return amqp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,20 @@
|
||||||
package org.apache.activemq.transport.amqp.transform;
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
import org.apache.qpid.proton.engine.Delivery;
|
import org.apache.qpid.proton.engine.Delivery;
|
||||||
|
import org.apache.qpid.proton.type.Binary;
|
||||||
|
import org.apache.qpid.proton.type.messaging.ApplicationProperties;
|
||||||
|
import org.apache.qpid.proton.type.messaging.DeliveryAnnotations;
|
||||||
|
import org.apache.qpid.proton.type.messaging.Footer;
|
||||||
|
import org.apache.qpid.proton.type.messaging.Header;
|
||||||
|
import org.apache.qpid.proton.type.messaging.MessageAnnotations;
|
||||||
|
import org.apache.qpid.proton.type.messaging.Properties;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
@ -27,7 +38,16 @@ import java.io.IOException;
|
||||||
public abstract class InboundTransformer {
|
public abstract class InboundTransformer {
|
||||||
|
|
||||||
JMSVendor vendor;
|
JMSVendor vendor;
|
||||||
|
|
||||||
|
public static final String TRANSFORMER_NATIVE = "native";
|
||||||
|
public static final String TRANSFORMER_RAW = "raw";
|
||||||
|
public static final String TRANSFORMER_JMS = "jms";
|
||||||
|
|
||||||
String prefixVendor = "JMS_AMQP_";
|
String prefixVendor = "JMS_AMQP_";
|
||||||
|
String prefixDeliveryAnnotations = "DA_";
|
||||||
|
String prefixMessageAnnotations= "MA_";
|
||||||
|
String prefixFooter = "FT_";
|
||||||
|
|
||||||
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
|
int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
|
||||||
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
|
int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY;
|
||||||
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
|
long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
|
||||||
|
@ -77,4 +97,111 @@ public abstract class InboundTransformer {
|
||||||
public void setVendor(JMSVendor vendor) {
|
public void setVendor(JMSVendor vendor) {
|
||||||
this.vendor = vendor;
|
this.vendor = vendor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
|
||||||
|
final Header header = amqp.getHeader();
|
||||||
|
if( header!=null ) {
|
||||||
|
if( header.getDurable()!=null ) {
|
||||||
|
jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||||
|
}
|
||||||
|
if( header.getPriority()!=null ) {
|
||||||
|
jms.setJMSPriority(header.getPriority().intValue());
|
||||||
|
}
|
||||||
|
if( header.getTtl()!=null ) {
|
||||||
|
jms.setJMSExpiration(header.getTtl().longValue());
|
||||||
|
}
|
||||||
|
if( header.getFirstAcquirer() !=null ) {
|
||||||
|
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
||||||
|
}
|
||||||
|
if( header.getDeliveryCount()!=null ) {
|
||||||
|
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
|
||||||
|
if( da!=null ) {
|
||||||
|
for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
|
||||||
|
String key = entry.getKey().toString();
|
||||||
|
setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
||||||
|
if( ma!=null ) {
|
||||||
|
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
|
||||||
|
String key = entry.getKey().toString();
|
||||||
|
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Properties properties = amqp.getProperties();
|
||||||
|
if( properties!=null ) {
|
||||||
|
if( properties.getMessageId()!=null ) {
|
||||||
|
jms.setJMSMessageID(properties.getMessageId().toString());
|
||||||
|
}
|
||||||
|
Binary userId = properties.getUserId();
|
||||||
|
if( userId!=null ) {
|
||||||
|
vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
|
||||||
|
}
|
||||||
|
if( properties.getTo()!=null ) {
|
||||||
|
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
|
||||||
|
}
|
||||||
|
if( properties.getSubject()!=null ) {
|
||||||
|
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
|
||||||
|
}
|
||||||
|
if( properties.getReplyTo() !=null ) {
|
||||||
|
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
|
||||||
|
}
|
||||||
|
if( properties.getCorrelationId() !=null ) {
|
||||||
|
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
|
||||||
|
}
|
||||||
|
if( properties.getContentType() !=null ) {
|
||||||
|
jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
|
||||||
|
}
|
||||||
|
if( properties.getContentEncoding() !=null ) {
|
||||||
|
jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
|
||||||
|
}
|
||||||
|
if( properties.getCreationTime()!=null ) {
|
||||||
|
jms.setJMSTimestamp(properties.getCreationTime().getTime());
|
||||||
|
}
|
||||||
|
if( properties.getGroupId()!=null ) {
|
||||||
|
vendor.setJMSXGroupID(jms, properties.getGroupId());
|
||||||
|
}
|
||||||
|
if( properties.getGroupSequence()!=null ) {
|
||||||
|
vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue());
|
||||||
|
}
|
||||||
|
if( properties.getReplyToGroupId()!=null ) {
|
||||||
|
jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final ApplicationProperties ap = amqp.getApplicationProperties();
|
||||||
|
if( ap !=null ) {
|
||||||
|
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
|
||||||
|
String key = entry.getKey().toString();
|
||||||
|
setProperty(jms, key, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Footer fp = amqp.getFooter();
|
||||||
|
if( fp !=null ) {
|
||||||
|
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
|
||||||
|
String key = entry.getKey().toString();
|
||||||
|
setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setProperty(Message msg, String key, Object value) throws JMSException {
|
||||||
|
//TODO support all types
|
||||||
|
if( value instanceof String ) {
|
||||||
|
msg.setStringProperty(key, (String) value);
|
||||||
|
} else if( value instanceof Integer ) {
|
||||||
|
msg.setIntProperty(key, ((Integer) value).intValue());
|
||||||
|
} else if( value instanceof Long ) {
|
||||||
|
msg.setLongProperty(key, ((Long) value).longValue());
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Unexpected value type: "+value.getClass());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,26 +30,13 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
public class JMSMappingInboundTransformer extends InboundTransformer {
|
public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
|
|
||||||
String prefixDeliveryAnnotations = "DA_";
|
|
||||||
String prefixMessageAnnotations= "MA_";
|
|
||||||
String prefixFooter = "FT_";
|
|
||||||
|
|
||||||
public JMSMappingInboundTransformer(JMSVendor vendor) {
|
public JMSMappingInboundTransformer(JMSVendor vendor) {
|
||||||
super(vendor);
|
super(vendor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||||
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
|
org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
|
||||||
|
|
||||||
int offset = amqpMessage.getArrayOffset();
|
|
||||||
int len = amqpMessage.getLength();
|
|
||||||
while( len > 0 ) {
|
|
||||||
final int decoded = amqp.decode(amqpMessage.getArray(), offset, len);
|
|
||||||
assert decoded > 0: "Make progress decoding the message";
|
|
||||||
offset += decoded;
|
|
||||||
len -= decoded;
|
|
||||||
}
|
|
||||||
|
|
||||||
Message rc;
|
Message rc;
|
||||||
final Section body = amqp.getBody();
|
final Section body = amqp.getBody();
|
||||||
|
@ -105,113 +92,10 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
rc.setJMSPriority(defaultPriority);
|
rc.setJMSPriority(defaultPriority);
|
||||||
rc.setJMSExpiration(defaultTtl);
|
rc.setJMSExpiration(defaultTtl);
|
||||||
|
|
||||||
final Header header = amqp.getHeader();
|
populateMessage(rc, amqp);
|
||||||
if( header!=null ) {
|
|
||||||
if( header.getDurable()!=null ) {
|
|
||||||
rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
|
||||||
}
|
|
||||||
if( header.getPriority()!=null ) {
|
|
||||||
rc.setJMSPriority(header.getPriority().intValue());
|
|
||||||
}
|
|
||||||
if( header.getTtl()!=null ) {
|
|
||||||
rc.setJMSExpiration(header.getTtl().longValue());
|
|
||||||
}
|
|
||||||
if( header.getFirstAcquirer() !=null ) {
|
|
||||||
rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
|
||||||
}
|
|
||||||
if( header.getDeliveryCount()!=null ) {
|
|
||||||
vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
|
|
||||||
if( da!=null ) {
|
|
||||||
for (Map.Entry entry : (Set<Map.Entry>)da.getValue().entrySet()) {
|
|
||||||
String key = entry.getKey().toString();
|
|
||||||
setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final MessageAnnotations ma = amqp.getMessageAnnotations();
|
|
||||||
if( ma!=null ) {
|
|
||||||
for (Map.Entry entry : (Set<Map.Entry>)ma.getValue().entrySet()) {
|
|
||||||
String key = entry.getKey().toString();
|
|
||||||
setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final Properties properties = amqp.getProperties();
|
|
||||||
if( properties!=null ) {
|
|
||||||
if( properties.getMessageId()!=null ) {
|
|
||||||
rc.setJMSMessageID(properties.getMessageId().toString());
|
|
||||||
}
|
|
||||||
Binary userId = properties.getUserId();
|
|
||||||
if( userId!=null ) {
|
|
||||||
vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
|
|
||||||
}
|
|
||||||
if( properties.getTo()!=null ) {
|
|
||||||
rc.setJMSDestination(vendor.createDestination(properties.getTo()));
|
|
||||||
}
|
|
||||||
if( properties.getSubject()!=null ) {
|
|
||||||
rc.setStringProperty(prefixVendor + "Subject", properties.getSubject());
|
|
||||||
}
|
|
||||||
if( properties.getReplyTo() !=null ) {
|
|
||||||
rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
|
|
||||||
}
|
|
||||||
if( properties.getCorrelationId() !=null ) {
|
|
||||||
rc.setJMSCorrelationID(properties.getCorrelationId().toString());
|
|
||||||
}
|
|
||||||
if( properties.getContentType() !=null ) {
|
|
||||||
rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
|
|
||||||
}
|
|
||||||
if( properties.getContentEncoding() !=null ) {
|
|
||||||
rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString());
|
|
||||||
}
|
|
||||||
if( properties.getCreationTime()!=null ) {
|
|
||||||
rc.setJMSTimestamp(properties.getCreationTime().getTime());
|
|
||||||
}
|
|
||||||
if( properties.getGroupId()!=null ) {
|
|
||||||
vendor.setJMSXGroupID(rc, properties.getGroupId());
|
|
||||||
}
|
|
||||||
if( properties.getGroupSequence()!=null ) {
|
|
||||||
vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue());
|
|
||||||
}
|
|
||||||
if( properties.getReplyToGroupId()!=null ) {
|
|
||||||
rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final ApplicationProperties ap = amqp.getApplicationProperties();
|
|
||||||
if( ap !=null ) {
|
|
||||||
for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
|
|
||||||
String key = entry.getKey().toString();
|
|
||||||
setProperty(rc, key, entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final Footer fp = amqp.getFooter();
|
|
||||||
if( fp !=null ) {
|
|
||||||
for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
|
|
||||||
String key = entry.getKey().toString();
|
|
||||||
setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||||
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setProperty(Message msg, String key, Object value) throws JMSException {
|
|
||||||
//TODO support all types
|
|
||||||
if( value instanceof String ) {
|
|
||||||
msg.setStringProperty(key, (String) value);
|
|
||||||
} else if( value instanceof Integer ) {
|
|
||||||
msg.setIntProperty(key, ((Integer) value).intValue());
|
|
||||||
} else if( value instanceof Long ) {
|
|
||||||
msg.setLongProperty(key, ((Long) value).longValue());
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException("Unexpected value type: "+value.getClass());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue