diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 1f8bb367ea..80b47cc265 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -118,12 +118,14 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay"); + private static final Symbol JMS_MAPPING_VERSION = Symbol.valueOf("x-opt-jms-mapping-version"); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); protected int prefetch; protected Transport protonTransport = Proton.transport(); protected Connection protonConnection = Proton.connection(); protected Collector eventCollector = new CollectorImpl(); + protected boolean useByteDestinationTypeAnnotation; public AmqpProtocolConverter(AmqpTransport transport) { this.amqpTransport = transport; @@ -134,6 +136,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { this.protonTransport.setMaxFrameSize(maxFrameSize); } + useByteDestinationTypeAnnotation = transport.getWireFormat().isUseByteDestinationTypeAnnotation(); + this.protonTransport.bind(this.protonConnection); // NOTE: QPid JMS client has a bug where the channel max is stored as a @@ -456,6 +460,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { connectionInfo.setClientId(clientId); } + Map props = protonConnection.getRemoteProperties(); + if (props != null) { + if (props.containsKey(JMS_MAPPING_VERSION)) { + useByteDestinationTypeAnnotation = true; + } + } + + if (useByteDestinationTypeAnnotation) { + outboundTransformer.setUseByteDestinationTypeAnnotations(true); + } + connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); sendToActiveMQ(connectionInfo, new ResponseHandler() { @@ -529,6 +544,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { LOG.warn("Unknown transformer type {} using native one instead", transformer); inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); } + + if (useByteDestinationTypeAnnotation) { + inboundTransformer.setUseByteDestinationTypeAnnotations(true); + } } return inboundTransformer; } diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index f6c2880bf8..b58273d751 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -41,6 +41,7 @@ public class AmqpWireFormat implements WireFormat { private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; private String anonymousNodeName = "$relay"; + private boolean useByteDestinationTypeAnnotation = false; @Override public ByteSequence marshal(Object command) throws IOException { @@ -135,4 +136,12 @@ public class AmqpWireFormat implements WireFormat { public void setAnonymousNodeName(String anonymousNodeName) { this.anonymousNodeName = anonymousNodeName; } + + public boolean isUseByteDestinationTypeAnnotation() { + return useByteDestinationTypeAnnotation; + } + + public void setUseByteDestinationTypeAnnotation(boolean useByteDestinationTypeAnnotation) { + this.useByteDestinationTypeAnnotation = useByteDestinationTypeAnnotation; + } }