From cac8c9c385c18ce665201d6fd7f7b0856569b815 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 4 Oct 2012 21:00:25 +0000 Subject: [PATCH] Improving the AQMP<-->JMS message mapping impl. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1394264 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/ActiveMQJMSVendor.java | 10 ++ .../transport/amqp/AmqpProtocolConverter.java | 23 +-- .../AMQPNativeInboundTransformer.java | 6 +- .../AMQPNativeOutboundTransformer.java | 32 ++-- .../transform/AutoOutboundTransformer.java | 48 +++++ .../transform/DroppingWritableBuffer.java | 91 ++++++++++ .../amqp/transform/EncodedMessage.java | 36 ++++ .../amqp/transform/InboundTransformer.java | 2 +- .../JMSMappingInboundTransformer.java | 10 +- .../JMSMappingOutboundTransformer.java | 170 ++++++++++++++++-- .../transport/amqp/transform/JMSVendor.java | 11 +- .../amqp/transform/OutboundTransformer.java | 2 +- 12 files changed, 388 insertions(+), 53 deletions(-) create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java index 37b23e1b8b..1c300cf5f0 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java @@ -64,4 +64,14 @@ public class ActiveMQJMSVendor extends JMSVendor { public void setJMSXGroupSequence(Message msg, int value) { ((ActiveMQMessage)msg).setGroupSequence(value); } + + @Override + public void setJMSXDeliveryCount(Message msg, long value) { + ((ActiveMQMessage)msg).setRedeliveryCounter((int) value); + } + + @Override + public String toAddress(Destination dest) { + return ((ActiveMQDestination)dest).getQualifiedName(); + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 7f3ff0af2d..eadb9c59bc 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -289,7 +289,7 @@ class AmqpProtocolConverter { } } - InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); class ProducerContext extends AmqpDeliveryListener { private final ProducerId producerId; @@ -322,7 +322,8 @@ class AmqpProtocolConverter { } final Buffer buffer = current.toBuffer(); - final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); + EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); + final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em); current = null; if( message.getDestination()==null ) { @@ -365,7 +366,7 @@ class AmqpProtocolConverter { } - OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE); + OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); class ConsumerContext extends AmqpDeliveryListener { private final ConsumerId consumerId; @@ -432,17 +433,19 @@ class AmqpProtocolConverter { } final MessageDispatch md = outbound.removeFirst(); - final byte[] tag = nextTag(); - final Delivery delivery = sender.delivery(tag, 0, tag.length); - delivery.setContext(md); - try { final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage(); - final byte[] amqpMessage = outboundTransformer.transform(jms); - if( amqpMessage!=null && amqpMessage.length > 0 ) { - current = new Buffer(amqpMessage); + final EncodedMessage amqp = outboundTransformer.transform(jms); + if( amqp!=null && amqp.getLength() > 0 ) { + + current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); + final byte[] tag = nextTag(); + final Delivery delivery = sender.delivery(tag, 0, tag.length); + delivery.setContext(md); + } else { // TODO: message could not be generated what now? + } } catch (Exception e) { e.printStackTrace(); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java index ebce1b8176..e80b9c2520 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java @@ -30,10 +30,10 @@ public class AMQPNativeInboundTransformer extends InboundTransformer { } @Override - public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception { + public Message transform(EncodedMessage amqpMessage) throws Exception { BytesMessage rc = vendor.createBytesMessage(); - rc.writeBytes(amqpMessage, offset, len); + rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); rc.setJMSDeliveryMode(defaultDeliveryMode); rc.setJMSPriority(defaultPriority); @@ -44,7 +44,7 @@ public class AMQPNativeInboundTransformer extends InboundTransformer { rc.setJMSExpiration(now + defaultTtl); } - rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat); + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); rc.setBooleanProperty(prefixVendor + "NATIVE", true); return rc; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java index 8e2f1a9262..fb2a991e14 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp.transform; import javax.jms.BytesMessage; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageFormatException; @@ -30,30 +31,31 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer { } @Override - public byte[] transform(Message jms) throws Exception { - if( jms == null ) + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) return null; - if( !(jms instanceof BytesMessage) ) + if( !(msg instanceof BytesMessage) ) return null; - - long messageFormat; try { - if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) { + if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) { return null; } - messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT"); } catch (MessageFormatException e) { return null; } - - // TODO: Proton should probably expose a way to set the msg format - // delivery.settMessageFormat(messageFormat); - - BytesMessage bytesMessage = (BytesMessage) jms; - byte data[] = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.readBytes(data); - return data; + return transform(this, (BytesMessage) msg); } + static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + long messageFormat; + try { + messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT"); + } catch (MessageFormatException e) { + return null; + } + byte data[] = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + return new EncodedMessage(messageFormat, data, 0, data.length); + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java new file mode 100644 index 0000000000..22edc7ce4b --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageFormatException; + +/** +* @author Hiram Chirino +*/ +public class AutoOutboundTransformer extends JMSMappingOutboundTransformer { + + public AutoOutboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) + return null; + if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + if( msg instanceof BytesMessage ) { + return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg); + } else { + return null; + } + } else { + return JMSMappingOutboundTransformer.transform(this, msg); + } + } + +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java new file mode 100644 index 0000000000..a70c156c13 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.activemq.transport.amqp.transform; + +import org.apache.qpid.proton.codec.WritableBuffer; + +import java.nio.ByteBuffer; + +public class DroppingWritableBuffer implements WritableBuffer +{ + int pos = 0; + + @Override + public boolean hasRemaining() { + return true; + } + + @Override + public void put(byte b) { + pos += 1; + } + + @Override + public void putFloat(float f) { + pos += 4; + } + + @Override + public void putDouble(double d) { + pos += 8; + } + + @Override + public void put(byte[] src, int offset, int length) { + pos += length; + } + + @Override + public void putShort(short s) { + pos += 2; + } + + @Override + public void putInt(int i) { + pos += 4; + } + + @Override + public void putLong(long l) { + pos += 8; + } + + @Override + public int remaining() { + return Integer.MAX_VALUE - pos; + } + + @Override + public int position() { + return pos; + } + + @Override + public void position(int position) { + pos = position; + } + + @Override + public void put(ByteBuffer payload) { + pos += payload.remaining(); + payload.position(payload.limit()); + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java new file mode 100644 index 0000000000..229a0f80a5 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import org.apache.qpid.proton.type.Binary; + +/** + * @author Hiram Chirino + */ +public class EncodedMessage extends Binary { + + final long messageFormat; + + public EncodedMessage(long messageFormat, byte[] data, int offset, int length) { + super(data, offset, length); + this.messageFormat = messageFormat; + } + + public long getMessageFormat() { + return messageFormat; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java index a1a86bfffa..5870a84963 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java @@ -36,7 +36,7 @@ public abstract class InboundTransformer { this.vendor = vendor; } - abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception; + abstract public Message transform(EncodedMessage amqpMessage) throws Exception; public int getDefaultDeliveryMode() { return defaultDeliveryMode; diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java index 78991d47ce..3c96b3526f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java @@ -38,11 +38,13 @@ public class JMSMappingInboundTransformer extends InboundTransformer { } @Override - public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception { + public Message transform(EncodedMessage amqpMessage) throws Exception { org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(); + int offset = amqpMessage.getArrayOffset(); + int len = amqpMessage.getLength(); while( len > 0 ) { - final int decoded = amqp.decode(amqpMessage, offset, len); + final int decoded = amqp.decode(amqpMessage.getArray(), offset, len); assert decoded > 0: "Make progress decoding the message"; offset += decoded; len -= decoded; @@ -110,7 +112,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer { rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); } if( header.getDeliveryCount()!=null ) { - rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue()); + vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue()); } } @@ -187,7 +189,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer { } } - rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat); + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); rc.setBooleanProperty(prefixVendor + "NATIVE", false); return rc; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java index 5f4822deb3..9cc9df1187 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java @@ -16,43 +16,181 @@ */ package org.apache.activemq.transport.amqp.transform; -import javax.jms.BytesMessage; -import javax.jms.Message; -import javax.jms.MessageFormatException; +import org.apache.qpid.proton.codec.CompositeWritableBuffer; +import org.apache.qpid.proton.codec.WritableBuffer; +import org.apache.qpid.proton.type.Binary; +import org.apache.qpid.proton.type.Symbol; +import org.apache.qpid.proton.type.UnsignedByte; +import org.apache.qpid.proton.type.UnsignedInteger; +import org.apache.qpid.proton.type.messaging.*; + +import javax.jms.*; +import java.io.ByteArrayOutputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; /** * @author Hiram Chirino */ public class JMSMappingOutboundTransformer extends OutboundTransformer { + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations= "MA_"; + String prefixFooter = "FT_"; public JMSMappingOutboundTransformer(JMSVendor vendor) { super(vendor); } @Override - public byte[] transform(Message jms) throws Exception { - if( jms == null ) + public EncodedMessage transform(Message msg) throws Exception { + if( msg == null ) return null; - if( !(jms instanceof BytesMessage) ) + try { + if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) { + return null; + } + } catch (MessageFormatException e) { return null; + } + return transform(this, msg); + } + + static EncodedMessage transform(JMSMappingOutboundTransformer options, Message msg) throws JMSException, UnsupportedEncodingException { + final JMSVendor vendor = options.vendor; + + final String messageFormatKey = options.prefixVendor + "MESSAGE_FORMAT"; + final String nativeKey = options.prefixVendor + "NATIVE"; + final String firstAcquirerKey = options.prefixVendor + "FirstAcquirer"; + final String prefixDeliveryAnnotationsKey = options.prefixVendor + options.prefixDeliveryAnnotations; + final String prefixMessageAnnotationsKey = options.prefixVendor + options.prefixMessageAnnotations; + final String subjectKey = options.prefixVendor +"Subject"; + final String contentTypeKey = options.prefixVendor +"ContentType"; + final String contentEncodingKey = options.prefixVendor +"ContentEncoding"; + final String replyToGroupIDKey = options.prefixVendor +"ReplyToGroupID"; + final String prefixFooterKey = options.prefixVendor + options.prefixFooter; long messageFormat; try { - if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) { - return null; - } - messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT"); + messageFormat = msg.getLongProperty(messageFormatKey); } catch (MessageFormatException e) { return null; } - // TODO: Proton should probably expose a way to set the msg format - // delivery.settMessageFormat(messageFormat); + Header header = new Header(); + Properties props=new Properties(); + HashMap daMap = null; + HashMap maMap = null; + HashMap apMap = null; + Section body=null; + HashMap footerMap = null; + if( msg instanceof BytesMessage ) { + BytesMessage m = (BytesMessage)msg; + byte data[] = new byte[(int) m.getBodyLength()]; + m.readBytes(data); + body = new Data(new Binary(data)); + } if( msg instanceof TextMessage ) { + body = new AmqpValue(((TextMessage) msg).getText()); + } if( msg instanceof MapMessage ) { + throw new RuntimeException("Not implemented"); + } if( msg instanceof StreamMessage ) { + throw new RuntimeException("Not implemented"); + } if( msg instanceof ObjectMessage ) { + throw new RuntimeException("Not implemented"); + } - BytesMessage bytesMessage = (BytesMessage) jms; - byte data[] = new byte[(int) bytesMessage.getBodyLength()]; - bytesMessage.readBytes(data); - return data; + header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); + header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); + if( msg.getJMSExpiration() != 0 ) { + header.setTtl(new UnsignedInteger((int) msg.getJMSExpiration())); + } + if( msg.getJMSType()!=null ) { + if( maMap==null ) maMap = new HashMap(); + maMap.put("x-opt-jms-type", msg.getJMSType()); + } + if( msg.getJMSMessageID()!=null ) { + props.setMessageId(msg.getJMSMessageID()); + } + if( msg.getJMSDestination()!=null ) { + props.setTo(vendor.toAddress(msg.getJMSDestination())); + } + if( msg.getJMSReplyTo()!=null ) { + props.setReplyTo(vendor.toAddress(msg.getJMSDestination())); + } + if( msg.getJMSCorrelationID()!=null ) { + props.setCorrelationId(msg.getJMSCorrelationID()); + } + if( msg.getJMSExpiration() != 0 ) { + props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration())); + } + if( msg.getJMSTimestamp()!= 0 ) { + props.setCreationTime(new Date(msg.getJMSTimestamp())); + } + + final Enumeration keys = msg.getPropertyNames(); + while (keys.hasMoreElements()) { + String key = (String) keys.nextElement(); + if( key.equals(messageFormatKey) || key.equals(nativeKey)) { + // skip.. + } else if( key.equals(firstAcquirerKey) ) { + header.setFirstAcquirer(msg.getBooleanProperty(key)); + } else if( key.startsWith("JMSXDeliveryCount") ) { + header.setDeliveryCount(new UnsignedInteger(msg.getIntProperty(key))); + } else if( key.startsWith("JMSXUserID") ) { + props.setUserId(new Binary(msg.getStringProperty(key).getBytes("UTF-8"))); + } else if( key.startsWith("JMSXGroupID") ) { + props.setGroupId(msg.getStringProperty(key)); + } else if( key.startsWith("JMSXGroupSeq") ) { + props.setGroupSequence(new UnsignedInteger(msg.getIntProperty(key))); + } else if( key.startsWith(prefixDeliveryAnnotationsKey) ) { + if( daMap == null ) daMap = new HashMap(); + String name = key.substring(prefixDeliveryAnnotationsKey.length()); + daMap.put(name, msg.getObjectProperty(key)); + } else if( key.startsWith(prefixMessageAnnotationsKey) ) { + if( maMap==null ) maMap = new HashMap(); + String name = key.substring(prefixMessageAnnotationsKey.length()); + maMap.put(name, msg.getObjectProperty(key)); + } else if( key.equals(subjectKey) ) { + props.setSubject(msg.getStringProperty(key)); + } else if( key.equals(contentTypeKey) ) { + props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); + } else if( key.equals(contentEncodingKey) ) { + props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); + } else if( key.equals(replyToGroupIDKey) ) { + props.setReplyToGroupId(msg.getStringProperty(key)); + } else if( key.startsWith(prefixFooterKey) ) { + if( footerMap==null ) footerMap = new HashMap(); + String name = key.substring(prefixFooterKey.length()); + footerMap.put(name, msg.getObjectProperty(key)); + } else { + if( apMap==null ) apMap = new HashMap(); + apMap.put(key, msg.getObjectProperty(key)); + } + } + + + MessageAnnotations ma=null; + if( maMap!=null ) ma = new MessageAnnotations(maMap); + DeliveryAnnotations da=null; + if( daMap!=null ) da = new DeliveryAnnotations(daMap); + ApplicationProperties ap=null; + if( apMap!=null ) ap = new ApplicationProperties(apMap); + Footer footer=null; + if( footerMap!=null ) footer = new Footer(footerMap); + + org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(header, da, ma, props, ap, body, footer); + + ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]); + final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); + int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); + if( overflow.position() > 0 ) { + buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]); + c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer)); + } + + return new EncodedMessage(messageFormat, buffer.array(), 0, c); } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java index e7571edd42..69dec5f30e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java @@ -19,11 +19,16 @@ abstract public class JMSVendor { public abstract MapMessage createMapMessage(); - public abstract void setJMSXUserID(Message jms, String value); + public abstract void setJMSXUserID(Message msg, String value); public abstract Destination createDestination(String name); - public abstract void setJMSXGroupID(Message jms, String groupId); + public abstract void setJMSXGroupID(Message msg, String groupId); + + public abstract void setJMSXGroupSequence(Message msg, int i); + + public abstract void setJMSXDeliveryCount(Message rc, long l); + + public abstract String toAddress(Destination msgDestination); - public abstract void setJMSXGroupSequence(Message jms, int i); } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java index 74a44c51be..6b541e1314 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java @@ -32,7 +32,7 @@ public abstract class OutboundTransformer { this.vendor = vendor; } - public abstract byte[] transform(Message jms) throws Exception; + public abstract EncodedMessage transform(Message jms) throws Exception; public String getPrefixVendor() { return prefixVendor;