From b8a6e5a6ca4e8ed6eaf342f0488be007d107e2b1 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Thu, 25 Oct 2012 13:36:09 +0000 Subject: [PATCH] amqp - support configurable transformers and populate message properties for the default native one git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1402148 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/AmqpProtocolConverter.java | 24 +++- .../transport/amqp/AmqpTransport.java | 2 + .../transport/amqp/AmqpTransportFilter.java | 9 ++ .../AMQPNativeInboundTransformer.java | 18 +-- .../transform/AMQPRawInboundTransformer.java | 47 +++++++ .../amqp/transform/EncodedMessage.java | 16 +++ .../amqp/transform/InboundTransformer.java | 127 ++++++++++++++++++ .../JMSMappingInboundTransformer.java | 120 +---------------- 8 files changed, 227 insertions(+), 136 deletions(-) create mode 100644 activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 534fa4204b..8ef5fb4be6 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -363,8 +363,24 @@ class AmqpProtocolConverter { } } - //InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); - InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); + InboundTransformer inboundTransformer; + + protected InboundTransformer getInboundTransformer() { + if (inboundTransformer == null) { + String transformer = amqpTransport.getTransformer(); + if (transformer.equals(InboundTransformer.TRANSFORMER_JMS)) { + inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); + } else if (transformer.equals(InboundTransformer.TRANSFORMER_NATIVE)) { + inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + } else if (transformer.equals(InboundTransformer.TRANSFORMER_RAW)) { + inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); + } else { + LOG.warn("Unknown transformer type " + transformer + ", using native one instead"); + inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); + } + } + return inboundTransformer; + } abstract class BaseProducerContext extends AmqpDeliveryListener { @@ -419,7 +435,7 @@ class AmqpProtocolConverter { @Override protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception { EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); - final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em); + final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); current = null; if( message.getDestination()==null ) { @@ -587,7 +603,7 @@ class AmqpProtocolConverter { private Source createSource(ActiveMQDestination dest) { org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source(); - rc.setAddress(inboundTransformer.getVendor().toAddress(dest)); + rc.setAddress(getInboundTransformer().getVendor().toAddress(dest)); return rc; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java index a8ceaaa3a2..02afe0b883 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -40,4 +40,6 @@ public interface AmqpTransport { public AmqpWireFormat getWireFormat(); public void stop() throws Exception; + + public String getTransformer(); } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index f9810e82ab..e334bb1b84 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -21,6 +21,7 @@ import org.apache.activemq.command.Command; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.TransportListener; +import org.apache.activemq.transport.amqp.transform.InboundTransformer; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.wireformat.WireFormat; @@ -46,6 +47,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor private AmqpWireFormat wireFormat; private boolean trace; + private String transformer = InboundTransformer.TRANSFORMER_NATIVE; public AmqpTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) { super(next); @@ -161,4 +163,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor } + public String getTransformer() { + return transformer; + } + + public void setTransformer(String transformer) { + this.transformer = transformer; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java index e80b9c2520..7867244d5b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java @@ -22,7 +22,7 @@ import javax.jms.Message; /** * @author Hiram Chirino */ -public class AMQPNativeInboundTransformer extends InboundTransformer { +public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer { public AMQPNativeInboundTransformer(JMSVendor vendor) { @@ -31,21 +31,11 @@ public class AMQPNativeInboundTransformer extends InboundTransformer { @Override public Message transform(EncodedMessage amqpMessage) throws Exception { + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); - BytesMessage rc = vendor.createBytesMessage(); - rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + Message rc = super.transform(amqpMessage); - rc.setJMSDeliveryMode(defaultDeliveryMode); - rc.setJMSPriority(defaultPriority); - - final long now = System.currentTimeMillis(); - rc.setJMSTimestamp(now); - if( defaultTtl > 0 ) { - rc.setJMSExpiration(now + defaultTtl); - } - - rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); - rc.setBooleanProperty(prefixVendor + "NATIVE", true); + populateMessage(rc, amqp); return rc; } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java new file mode 100644 index 0000000000..c7590c2475 --- /dev/null +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.transform; + +import javax.jms.BytesMessage; +import javax.jms.Message; + +public class AMQPRawInboundTransformer extends InboundTransformer { + + public AMQPRawInboundTransformer(JMSVendor vendor) { + super(vendor); + } + + @Override + public Message transform(EncodedMessage amqpMessage) throws Exception { + BytesMessage rc = vendor.createBytesMessage(); + rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength()); + + rc.setJMSDeliveryMode(defaultDeliveryMode); + rc.setJMSPriority(defaultPriority); + + final long now = System.currentTimeMillis(); + rc.setJMSTimestamp(now); + if( defaultTtl > 0 ) { + rc.setJMSExpiration(now + defaultTtl); + } + + rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); + rc.setBooleanProperty(prefixVendor + "NATIVE", true); + + return rc; + } +} diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java index 229a0f80a5..8ada75034a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp.transform; +import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.type.Binary; /** @@ -33,4 +34,19 @@ public class EncodedMessage extends Binary { public long getMessageFormat() { return messageFormat; } + + public Message decode() throws Exception { + Message amqp = new Message(); + + int offset = getArrayOffset(); + int len = getLength(); + while( len > 0 ) { + final int decoded = amqp.decode(getArray(), offset, len); + assert decoded > 0: "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + return amqp; + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java index 5870a84963..72b1042016 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java @@ -17,9 +17,20 @@ package org.apache.activemq.transport.amqp.transform; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.type.Binary; +import org.apache.qpid.proton.type.messaging.ApplicationProperties; +import org.apache.qpid.proton.type.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.type.messaging.Footer; +import org.apache.qpid.proton.type.messaging.Header; +import org.apache.qpid.proton.type.messaging.MessageAnnotations; +import org.apache.qpid.proton.type.messaging.Properties; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import java.io.IOException; +import java.util.Map; +import java.util.Set; /** * @author Hiram Chirino @@ -27,7 +38,16 @@ import java.io.IOException; public abstract class InboundTransformer { JMSVendor vendor; + + public static final String TRANSFORMER_NATIVE = "native"; + public static final String TRANSFORMER_RAW = "raw"; + public static final String TRANSFORMER_JMS = "jms"; + String prefixVendor = "JMS_AMQP_"; + String prefixDeliveryAnnotations = "DA_"; + String prefixMessageAnnotations= "MA_"; + String prefixFooter = "FT_"; + int defaultDeliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE; int defaultPriority = javax.jms.Message.DEFAULT_PRIORITY; long defaultTtl = javax.jms.Message.DEFAULT_TIME_TO_LIVE; @@ -77,4 +97,111 @@ public abstract class InboundTransformer { public void setVendor(JMSVendor vendor) { this.vendor = vendor; } + + protected void populateMessage(Message jms, org.apache.qpid.proton.message.Message amqp) throws Exception { + final Header header = amqp.getHeader(); + if( header!=null ) { + if( header.getDurable()!=null ) { + jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + } + if( header.getPriority()!=null ) { + jms.setJMSPriority(header.getPriority().intValue()); + } + if( header.getTtl()!=null ) { + jms.setJMSExpiration(header.getTtl().longValue()); + } + if( header.getFirstAcquirer() !=null ) { + jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); + } + if( header.getDeliveryCount()!=null ) { + vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); + } + } + + final DeliveryAnnotations da = amqp.getDeliveryAnnotations(); + if( da!=null ) { + for (Map.Entry entry : (Set)da.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue()); + } + } + + final MessageAnnotations ma = amqp.getMessageAnnotations(); + if( ma!=null ) { + for (Map.Entry entry : (Set)ma.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); + } + } + + final Properties properties = amqp.getProperties(); + if( properties!=null ) { + if( properties.getMessageId()!=null ) { + jms.setJMSMessageID(properties.getMessageId().toString()); + } + Binary userId = properties.getUserId(); + if( userId!=null ) { + vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); + } + if( properties.getTo()!=null ) { + jms.setJMSDestination(vendor.createDestination(properties.getTo())); + } + if( properties.getSubject()!=null ) { + jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + } + if( properties.getReplyTo() !=null ) { + jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); + } + if( properties.getCorrelationId() !=null ) { + jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + } + if( properties.getContentType() !=null ) { + jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); + } + if( properties.getContentEncoding() !=null ) { + jms.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); + } + if( properties.getCreationTime()!=null ) { + jms.setJMSTimestamp(properties.getCreationTime().getTime()); + } + if( properties.getGroupId()!=null ) { + vendor.setJMSXGroupID(jms, properties.getGroupId()); + } + if( properties.getGroupSequence()!=null ) { + vendor.setJMSXGroupSequence(jms, properties.getGroupSequence().intValue()); + } + if( properties.getReplyToGroupId()!=null ) { + jms.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); + } + } + + final ApplicationProperties ap = amqp.getApplicationProperties(); + if( ap !=null ) { + for (Map.Entry entry : (Set)ap.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, key, entry.getValue()); + } + } + + final Footer fp = amqp.getFooter(); + if( fp !=null ) { + for (Map.Entry entry : (Set)fp.getValue().entrySet()) { + String key = entry.getKey().toString(); + setProperty(jms, prefixVendor + prefixFooter + key, entry.getValue()); + } + } + } + + private void setProperty(Message msg, String key, Object value) throws JMSException { + //TODO support all types + if( value instanceof String ) { + msg.setStringProperty(key, (String) value); + } else if( value instanceof Integer ) { + msg.setIntProperty(key, ((Integer) value).intValue()); + } else if( value instanceof Long ) { + msg.setLongProperty(key, ((Long) value).longValue()); + } else { + throw new RuntimeException("Unexpected value type: "+value.getClass()); + } + } } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java index 835265b244..50958b4b08 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java @@ -30,26 +30,13 @@ import java.util.Set; */ public class JMSMappingInboundTransformer extends InboundTransformer { - String prefixDeliveryAnnotations = "DA_"; - String prefixMessageAnnotations= "MA_"; - String prefixFooter = "FT_"; - public JMSMappingInboundTransformer(JMSVendor vendor) { super(vendor); } @Override public Message transform(EncodedMessage amqpMessage) throws Exception { - org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message(); - - int offset = amqpMessage.getArrayOffset(); - int len = amqpMessage.getLength(); - while( len > 0 ) { - final int decoded = amqp.decode(amqpMessage.getArray(), offset, len); - assert decoded > 0: "Make progress decoding the message"; - offset += decoded; - len -= decoded; - } + org.apache.qpid.proton.message.Message amqp = amqpMessage.decode(); Message rc; final Section body = amqp.getBody(); @@ -105,113 +92,10 @@ public class JMSMappingInboundTransformer extends InboundTransformer { rc.setJMSPriority(defaultPriority); rc.setJMSExpiration(defaultTtl); - final Header header = amqp.getHeader(); - if( header!=null ) { - if( header.getDurable()!=null ) { - rc.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } - if( header.getPriority()!=null ) { - rc.setJMSPriority(header.getPriority().intValue()); - } - if( header.getTtl()!=null ) { - rc.setJMSExpiration(header.getTtl().longValue()); - } - if( header.getFirstAcquirer() !=null ) { - rc.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); - } - if( header.getDeliveryCount()!=null ) { - vendor.setJMSXDeliveryCount(rc, header.getDeliveryCount().longValue()); - } - } - - final DeliveryAnnotations da = amqp.getDeliveryAnnotations(); - if( da!=null ) { - for (Map.Entry entry : (Set)da.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(rc, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue()); - } - } - - final MessageAnnotations ma = amqp.getMessageAnnotations(); - if( ma!=null ) { - for (Map.Entry entry : (Set)ma.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(rc, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); - } - } - - final Properties properties = amqp.getProperties(); - if( properties!=null ) { - if( properties.getMessageId()!=null ) { - rc.setJMSMessageID(properties.getMessageId().toString()); - } - Binary userId = properties.getUserId(); - if( userId!=null ) { - vendor.setJMSXUserID(rc, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8")); - } - if( properties.getTo()!=null ) { - rc.setJMSDestination(vendor.createDestination(properties.getTo())); - } - if( properties.getSubject()!=null ) { - rc.setStringProperty(prefixVendor + "Subject", properties.getSubject()); - } - if( properties.getReplyTo() !=null ) { - rc.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); - } - if( properties.getCorrelationId() !=null ) { - rc.setJMSCorrelationID(properties.getCorrelationId().toString()); - } - if( properties.getContentType() !=null ) { - rc.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); - } - if( properties.getContentEncoding() !=null ) { - rc.setStringProperty(prefixVendor + "ContentEncoding", properties.getContentEncoding().toString()); - } - if( properties.getCreationTime()!=null ) { - rc.setJMSTimestamp(properties.getCreationTime().getTime()); - } - if( properties.getGroupId()!=null ) { - vendor.setJMSXGroupID(rc, properties.getGroupId()); - } - if( properties.getGroupSequence()!=null ) { - vendor.setJMSXGroupSequence(rc, properties.getGroupSequence().intValue()); - } - if( properties.getReplyToGroupId()!=null ) { - rc.setStringProperty(prefixVendor + "ReplyToGroupID", properties.getReplyToGroupId()); - } - } - - final ApplicationProperties ap = amqp.getApplicationProperties(); - if( ap !=null ) { - for (Map.Entry entry : (Set)ap.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(rc, key, entry.getValue()); - } - } - - final Footer fp = amqp.getFooter(); - if( fp !=null ) { - for (Map.Entry entry : (Set)fp.getValue().entrySet()) { - String key = entry.getKey().toString(); - setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue()); - } - } + populateMessage(rc, amqp); rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat()); rc.setBooleanProperty(prefixVendor + "NATIVE", false); return rc; } - - private void setProperty(Message msg, String key, Object value) throws JMSException { - //TODO support all types - if( value instanceof String ) { - msg.setStringProperty(key, (String) value); - } else if( value instanceof Integer ) { - msg.setIntProperty(key, ((Integer) value).intValue()); - } else if( value instanceof Long ) { - msg.setLongProperty(key, ((Long) value).longValue()); - } else { - throw new RuntimeException("Unexpected value type: "+value.getClass()); - } - } }