mirror of https://github.com/apache/activemq.git
Improving the AQMP<-->JMS message mapping impl.
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1394264 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
205699e11e
commit
cac8c9c385
|
@ -64,4 +64,14 @@ public class ActiveMQJMSVendor extends JMSVendor {
|
||||||
public void setJMSXGroupSequence(Message msg, int value) {
|
public void setJMSXGroupSequence(Message msg, int value) {
|
||||||
((ActiveMQMessage)msg).setGroupSequence(value);
|
((ActiveMQMessage)msg).setGroupSequence(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setJMSXDeliveryCount(Message msg, long value) {
|
||||||
|
((ActiveMQMessage)msg).setRedeliveryCounter((int) value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toAddress(Destination dest) {
|
||||||
|
return ((ActiveMQDestination)dest).getQualifiedName();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -289,7 +289,7 @@ class AmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
|
||||||
class ProducerContext extends AmqpDeliveryListener {
|
class ProducerContext extends AmqpDeliveryListener {
|
||||||
private final ProducerId producerId;
|
private final ProducerId producerId;
|
||||||
|
@ -322,7 +322,8 @@ class AmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
final Buffer buffer = current.toBuffer();
|
final Buffer buffer = current.toBuffer();
|
||||||
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
|
||||||
|
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
|
||||||
current = null;
|
current = null;
|
||||||
|
|
||||||
if( message.getDestination()==null ) {
|
if( message.getDestination()==null ) {
|
||||||
|
@ -365,7 +366,7 @@ class AmqpProtocolConverter {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
OutboundTransformer outboundTransformer = new AMQPNativeOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
|
||||||
|
|
||||||
class ConsumerContext extends AmqpDeliveryListener {
|
class ConsumerContext extends AmqpDeliveryListener {
|
||||||
private final ConsumerId consumerId;
|
private final ConsumerId consumerId;
|
||||||
|
@ -432,17 +433,19 @@ class AmqpProtocolConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
final MessageDispatch md = outbound.removeFirst();
|
final MessageDispatch md = outbound.removeFirst();
|
||||||
final byte[] tag = nextTag();
|
|
||||||
final Delivery delivery = sender.delivery(tag, 0, tag.length);
|
|
||||||
delivery.setContext(md);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
|
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
|
||||||
final byte[] amqpMessage = outboundTransformer.transform(jms);
|
final EncodedMessage amqp = outboundTransformer.transform(jms);
|
||||||
if( amqpMessage!=null && amqpMessage.length > 0 ) {
|
if( amqp!=null && amqp.getLength() > 0 ) {
|
||||||
current = new Buffer(amqpMessage);
|
|
||||||
|
current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
|
||||||
|
final byte[] tag = nextTag();
|
||||||
|
final Delivery delivery = sender.delivery(tag, 0, tag.length);
|
||||||
|
delivery.setContext(md);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// TODO: message could not be generated what now?
|
// TODO: message could not be generated what now?
|
||||||
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
|
|
@ -30,10 +30,10 @@ public class AMQPNativeInboundTransformer extends InboundTransformer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
|
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||||
|
|
||||||
BytesMessage rc = vendor.createBytesMessage();
|
BytesMessage rc = vendor.createBytesMessage();
|
||||||
rc.writeBytes(amqpMessage, offset, len);
|
rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
|
||||||
|
|
||||||
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
rc.setJMSDeliveryMode(defaultDeliveryMode);
|
||||||
rc.setJMSPriority(defaultPriority);
|
rc.setJMSPriority(defaultPriority);
|
||||||
|
@ -44,7 +44,7 @@ public class AMQPNativeInboundTransformer extends InboundTransformer {
|
||||||
rc.setJMSExpiration(now + defaultTtl);
|
rc.setJMSExpiration(now + defaultTtl);
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
|
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||||
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
rc.setBooleanProperty(prefixVendor + "NATIVE", true);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package org.apache.activemq.transport.amqp.transform;
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageFormatException;
|
import javax.jms.MessageFormatException;
|
||||||
|
|
||||||
|
@ -30,30 +31,31 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] transform(Message jms) throws Exception {
|
public EncodedMessage transform(Message msg) throws Exception {
|
||||||
if( jms == null )
|
if( msg == null )
|
||||||
return null;
|
return null;
|
||||||
if( !(jms instanceof BytesMessage) )
|
if( !(msg instanceof BytesMessage) )
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
long messageFormat;
|
|
||||||
try {
|
try {
|
||||||
if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
|
|
||||||
} catch (MessageFormatException e) {
|
} catch (MessageFormatException e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
return transform(this, (BytesMessage) msg);
|
||||||
// TODO: Proton should probably expose a way to set the msg format
|
|
||||||
// delivery.settMessageFormat(messageFormat);
|
|
||||||
|
|
||||||
BytesMessage bytesMessage = (BytesMessage) jms;
|
|
||||||
byte data[] = new byte[(int) bytesMessage.getBodyLength()];
|
|
||||||
bytesMessage.readBytes(data);
|
|
||||||
return data;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EncodedMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
|
||||||
|
long messageFormat;
|
||||||
|
try {
|
||||||
|
messageFormat = msg.getLongProperty(options.prefixVendor + "MESSAGE_FORMAT");
|
||||||
|
} catch (MessageFormatException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
byte data[] = new byte[(int) msg.getBodyLength()];
|
||||||
|
msg.readBytes(data);
|
||||||
|
return new EncodedMessage(messageFormat, data, 0, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageFormatException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
public class AutoOutboundTransformer extends JMSMappingOutboundTransformer {
|
||||||
|
|
||||||
|
public AutoOutboundTransformer(JMSVendor vendor) {
|
||||||
|
super(vendor);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EncodedMessage transform(Message msg) throws Exception {
|
||||||
|
if( msg == null )
|
||||||
|
return null;
|
||||||
|
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||||
|
if( msg instanceof BytesMessage ) {
|
||||||
|
return AMQPNativeOutboundTransformer.transform(this, (BytesMessage)msg);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return JMSMappingOutboundTransformer.transform(this, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
public class DroppingWritableBuffer implements WritableBuffer
|
||||||
|
{
|
||||||
|
int pos = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasRemaining() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(byte b) {
|
||||||
|
pos += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putFloat(float f) {
|
||||||
|
pos += 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putDouble(double d) {
|
||||||
|
pos += 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(byte[] src, int offset, int length) {
|
||||||
|
pos += length;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putShort(short s) {
|
||||||
|
pos += 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putInt(int i) {
|
||||||
|
pos += 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void putLong(long l) {
|
||||||
|
pos += 8;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int remaining() {
|
||||||
|
return Integer.MAX_VALUE - pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int position() {
|
||||||
|
return pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void position(int position) {
|
||||||
|
pos = position;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(ByteBuffer payload) {
|
||||||
|
pos += payload.remaining();
|
||||||
|
payload.position(payload.limit());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,36 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
|
import org.apache.qpid.proton.type.Binary;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
|
*/
|
||||||
|
public class EncodedMessage extends Binary {
|
||||||
|
|
||||||
|
final long messageFormat;
|
||||||
|
|
||||||
|
public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
|
||||||
|
super(data, offset, length);
|
||||||
|
this.messageFormat = messageFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getMessageFormat() {
|
||||||
|
return messageFormat;
|
||||||
|
}
|
||||||
|
}
|
|
@ -36,7 +36,7 @@ public abstract class InboundTransformer {
|
||||||
this.vendor = vendor;
|
this.vendor = vendor;
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract public Message transform(long messageFormat, byte [] data, int offset, int len) throws Exception;
|
abstract public Message transform(EncodedMessage amqpMessage) throws Exception;
|
||||||
|
|
||||||
public int getDefaultDeliveryMode() {
|
public int getDefaultDeliveryMode() {
|
||||||
return defaultDeliveryMode;
|
return defaultDeliveryMode;
|
||||||
|
|
|
@ -38,11 +38,13 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Message transform(long messageFormat, byte [] amqpMessage, int offset, int len) throws Exception {
|
public Message transform(EncodedMessage amqpMessage) throws Exception {
|
||||||
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
|
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
|
||||||
|
|
||||||
|
int offset = amqpMessage.getArrayOffset();
|
||||||
|
int len = amqpMessage.getLength();
|
||||||
while( len > 0 ) {
|
while( len > 0 ) {
|
||||||
final int decoded = amqp.decode(amqpMessage, offset, len);
|
final int decoded = amqp.decode(amqpMessage.getArray(), offset, len);
|
||||||
assert decoded > 0: "Make progress decoding the message";
|
assert decoded > 0: "Make progress decoding the message";
|
||||||
offset += decoded;
|
offset += decoded;
|
||||||
len -= decoded;
|
len -= decoded;
|
||||||
|
@ -110,7 +112,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
|
||||||
}
|
}
|
||||||
if( header.getDeliveryCount()!=null ) {
|
if( header.getDeliveryCount()!=null ) {
|
||||||
rc.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue());
|
vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +189,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
|
rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
|
||||||
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
rc.setBooleanProperty(prefixVendor + "NATIVE", false);
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,43 +16,181 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.transport.amqp.transform;
|
package org.apache.activemq.transport.amqp.transform;
|
||||||
|
|
||||||
import javax.jms.BytesMessage;
|
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
|
||||||
import javax.jms.Message;
|
import org.apache.qpid.proton.codec.WritableBuffer;
|
||||||
import javax.jms.MessageFormatException;
|
import org.apache.qpid.proton.type.Binary;
|
||||||
|
import org.apache.qpid.proton.type.Symbol;
|
||||||
|
import org.apache.qpid.proton.type.UnsignedByte;
|
||||||
|
import org.apache.qpid.proton.type.UnsignedInteger;
|
||||||
|
import org.apache.qpid.proton.type.messaging.*;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.HashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
|
||||||
*/
|
*/
|
||||||
public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
||||||
|
|
||||||
|
String prefixDeliveryAnnotations = "DA_";
|
||||||
|
String prefixMessageAnnotations= "MA_";
|
||||||
|
String prefixFooter = "FT_";
|
||||||
|
|
||||||
public JMSMappingOutboundTransformer(JMSVendor vendor) {
|
public JMSMappingOutboundTransformer(JMSVendor vendor) {
|
||||||
super(vendor);
|
super(vendor);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] transform(Message jms) throws Exception {
|
public EncodedMessage transform(Message msg) throws Exception {
|
||||||
if( jms == null )
|
if( msg == null )
|
||||||
return null;
|
return null;
|
||||||
if( !(jms instanceof BytesMessage) )
|
try {
|
||||||
|
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (MessageFormatException e) {
|
||||||
return null;
|
return null;
|
||||||
|
}
|
||||||
|
return transform(this, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static EncodedMessage transform(JMSMappingOutboundTransformer options, Message msg) throws JMSException, UnsupportedEncodingException {
|
||||||
|
final JMSVendor vendor = options.vendor;
|
||||||
|
|
||||||
|
final String messageFormatKey = options.prefixVendor + "MESSAGE_FORMAT";
|
||||||
|
final String nativeKey = options.prefixVendor + "NATIVE";
|
||||||
|
final String firstAcquirerKey = options.prefixVendor + "FirstAcquirer";
|
||||||
|
final String prefixDeliveryAnnotationsKey = options.prefixVendor + options.prefixDeliveryAnnotations;
|
||||||
|
final String prefixMessageAnnotationsKey = options.prefixVendor + options.prefixMessageAnnotations;
|
||||||
|
final String subjectKey = options.prefixVendor +"Subject";
|
||||||
|
final String contentTypeKey = options.prefixVendor +"ContentType";
|
||||||
|
final String contentEncodingKey = options.prefixVendor +"ContentEncoding";
|
||||||
|
final String replyToGroupIDKey = options.prefixVendor +"ReplyToGroupID";
|
||||||
|
final String prefixFooterKey = options.prefixVendor + options.prefixFooter;
|
||||||
|
|
||||||
long messageFormat;
|
long messageFormat;
|
||||||
try {
|
try {
|
||||||
if( !jms.getBooleanProperty(prefixVendor + "NATIVE") ) {
|
messageFormat = msg.getLongProperty(messageFormatKey);
|
||||||
return null;
|
|
||||||
}
|
|
||||||
messageFormat = jms.getLongProperty(prefixVendor + "MESSAGE_FORMAT");
|
|
||||||
} catch (MessageFormatException e) {
|
} catch (MessageFormatException e) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Proton should probably expose a way to set the msg format
|
Header header = new Header();
|
||||||
// delivery.settMessageFormat(messageFormat);
|
Properties props=new Properties();
|
||||||
|
HashMap daMap = null;
|
||||||
|
HashMap maMap = null;
|
||||||
|
HashMap apMap = null;
|
||||||
|
Section body=null;
|
||||||
|
HashMap footerMap = null;
|
||||||
|
if( msg instanceof BytesMessage ) {
|
||||||
|
BytesMessage m = (BytesMessage)msg;
|
||||||
|
byte data[] = new byte[(int) m.getBodyLength()];
|
||||||
|
m.readBytes(data);
|
||||||
|
body = new Data(new Binary(data));
|
||||||
|
} if( msg instanceof TextMessage ) {
|
||||||
|
body = new AmqpValue(((TextMessage) msg).getText());
|
||||||
|
} if( msg instanceof MapMessage ) {
|
||||||
|
throw new RuntimeException("Not implemented");
|
||||||
|
} if( msg instanceof StreamMessage ) {
|
||||||
|
throw new RuntimeException("Not implemented");
|
||||||
|
} if( msg instanceof ObjectMessage ) {
|
||||||
|
throw new RuntimeException("Not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
BytesMessage bytesMessage = (BytesMessage) jms;
|
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
|
||||||
byte data[] = new byte[(int) bytesMessage.getBodyLength()];
|
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
|
||||||
bytesMessage.readBytes(data);
|
if( msg.getJMSExpiration() != 0 ) {
|
||||||
return data;
|
header.setTtl(new UnsignedInteger((int) msg.getJMSExpiration()));
|
||||||
|
}
|
||||||
|
if( msg.getJMSType()!=null ) {
|
||||||
|
if( maMap==null ) maMap = new HashMap();
|
||||||
|
maMap.put("x-opt-jms-type", msg.getJMSType());
|
||||||
|
}
|
||||||
|
if( msg.getJMSMessageID()!=null ) {
|
||||||
|
props.setMessageId(msg.getJMSMessageID());
|
||||||
|
}
|
||||||
|
if( msg.getJMSDestination()!=null ) {
|
||||||
|
props.setTo(vendor.toAddress(msg.getJMSDestination()));
|
||||||
|
}
|
||||||
|
if( msg.getJMSReplyTo()!=null ) {
|
||||||
|
props.setReplyTo(vendor.toAddress(msg.getJMSDestination()));
|
||||||
|
}
|
||||||
|
if( msg.getJMSCorrelationID()!=null ) {
|
||||||
|
props.setCorrelationId(msg.getJMSCorrelationID());
|
||||||
|
}
|
||||||
|
if( msg.getJMSExpiration() != 0 ) {
|
||||||
|
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
|
||||||
|
}
|
||||||
|
if( msg.getJMSTimestamp()!= 0 ) {
|
||||||
|
props.setCreationTime(new Date(msg.getJMSTimestamp()));
|
||||||
|
}
|
||||||
|
|
||||||
|
final Enumeration keys = msg.getPropertyNames();
|
||||||
|
while (keys.hasMoreElements()) {
|
||||||
|
String key = (String) keys.nextElement();
|
||||||
|
if( key.equals(messageFormatKey) || key.equals(nativeKey)) {
|
||||||
|
// skip..
|
||||||
|
} else if( key.equals(firstAcquirerKey) ) {
|
||||||
|
header.setFirstAcquirer(msg.getBooleanProperty(key));
|
||||||
|
} else if( key.startsWith("JMSXDeliveryCount") ) {
|
||||||
|
header.setDeliveryCount(new UnsignedInteger(msg.getIntProperty(key)));
|
||||||
|
} else if( key.startsWith("JMSXUserID") ) {
|
||||||
|
props.setUserId(new Binary(msg.getStringProperty(key).getBytes("UTF-8")));
|
||||||
|
} else if( key.startsWith("JMSXGroupID") ) {
|
||||||
|
props.setGroupId(msg.getStringProperty(key));
|
||||||
|
} else if( key.startsWith("JMSXGroupSeq") ) {
|
||||||
|
props.setGroupSequence(new UnsignedInteger(msg.getIntProperty(key)));
|
||||||
|
} else if( key.startsWith(prefixDeliveryAnnotationsKey) ) {
|
||||||
|
if( daMap == null ) daMap = new HashMap();
|
||||||
|
String name = key.substring(prefixDeliveryAnnotationsKey.length());
|
||||||
|
daMap.put(name, msg.getObjectProperty(key));
|
||||||
|
} else if( key.startsWith(prefixMessageAnnotationsKey) ) {
|
||||||
|
if( maMap==null ) maMap = new HashMap();
|
||||||
|
String name = key.substring(prefixMessageAnnotationsKey.length());
|
||||||
|
maMap.put(name, msg.getObjectProperty(key));
|
||||||
|
} else if( key.equals(subjectKey) ) {
|
||||||
|
props.setSubject(msg.getStringProperty(key));
|
||||||
|
} else if( key.equals(contentTypeKey) ) {
|
||||||
|
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||||
|
} else if( key.equals(contentEncodingKey) ) {
|
||||||
|
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||||
|
} else if( key.equals(replyToGroupIDKey) ) {
|
||||||
|
props.setReplyToGroupId(msg.getStringProperty(key));
|
||||||
|
} else if( key.startsWith(prefixFooterKey) ) {
|
||||||
|
if( footerMap==null ) footerMap = new HashMap();
|
||||||
|
String name = key.substring(prefixFooterKey.length());
|
||||||
|
footerMap.put(name, msg.getObjectProperty(key));
|
||||||
|
} else {
|
||||||
|
if( apMap==null ) apMap = new HashMap();
|
||||||
|
apMap.put(key, msg.getObjectProperty(key));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
MessageAnnotations ma=null;
|
||||||
|
if( maMap!=null ) ma = new MessageAnnotations(maMap);
|
||||||
|
DeliveryAnnotations da=null;
|
||||||
|
if( daMap!=null ) da = new DeliveryAnnotations(daMap);
|
||||||
|
ApplicationProperties ap=null;
|
||||||
|
if( apMap!=null ) ap = new ApplicationProperties(apMap);
|
||||||
|
Footer footer=null;
|
||||||
|
if( footerMap!=null ) footer = new Footer(footerMap);
|
||||||
|
|
||||||
|
org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(header, da, ma, props, ap, body, footer);
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
|
||||||
|
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
|
||||||
|
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
|
||||||
|
if( overflow.position() > 0 ) {
|
||||||
|
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
|
||||||
|
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new EncodedMessage(messageFormat, buffer.array(), 0, c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,11 +19,16 @@ abstract public class JMSVendor {
|
||||||
|
|
||||||
public abstract MapMessage createMapMessage();
|
public abstract MapMessage createMapMessage();
|
||||||
|
|
||||||
public abstract void setJMSXUserID(Message jms, String value);
|
public abstract void setJMSXUserID(Message msg, String value);
|
||||||
|
|
||||||
public abstract Destination createDestination(String name);
|
public abstract Destination createDestination(String name);
|
||||||
|
|
||||||
public abstract void setJMSXGroupID(Message jms, String groupId);
|
public abstract void setJMSXGroupID(Message msg, String groupId);
|
||||||
|
|
||||||
|
public abstract void setJMSXGroupSequence(Message msg, int i);
|
||||||
|
|
||||||
|
public abstract void setJMSXDeliveryCount(Message rc, long l);
|
||||||
|
|
||||||
|
public abstract String toAddress(Destination msgDestination);
|
||||||
|
|
||||||
public abstract void setJMSXGroupSequence(Message jms, int i);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ public abstract class OutboundTransformer {
|
||||||
this.vendor = vendor;
|
this.vendor = vendor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract byte[] transform(Message jms) throws Exception;
|
public abstract EncodedMessage transform(Message jms) throws Exception;
|
||||||
|
|
||||||
public String getPrefixVendor() {
|
public String getPrefixVendor() {
|
||||||
return prefixVendor;
|
return prefixVendor;
|
||||||
|
|
Loading…
Reference in New Issue