diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java index dad365dedc..5afa9955de 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java @@ -46,7 +46,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong; * *
When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or * ulong but can't be converted into the indicated format, an exception will be thrown. - * */ public class AMQPMessageIdHelper { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java index a976240979..216daa9a34 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index 2223b5a18c..e883bcf955 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -207,7 +207,7 @@ public abstract class InboundTransformer { jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); } if (properties.getCorrelationId() != null) { - jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); } if (properties.getContentType() != null) { jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 7e6af2fed7..c9a94fa127 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -211,7 +211,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo())); } if (msg.getJMSCorrelationID() != null) { - props.setCorrelationId(msg.getJMSCorrelationID()); + String correlationId = msg.getJMSCorrelationID(); + try { + props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + } catch (AmqpProtocolException e) { + props.setCorrelationId(correlationId); + } } if (msg.getJMSExpiration() != 0) { long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index d974690d67..b954e042ae 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -224,6 +224,58 @@ public class AmqpMessage { getWrappedMessage().setMessageId(messageId); } + /** + * Sets the CorrelationId property on an outbound message using the provided String + * + * @param correlationId + * the String Correlation ID value to set. + */ + public void setCorrelationId(String correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + + /** + * Return the set CorrelationId value in String form, if there are no properties + * in the given message return null. + * + * @return the set correlation ID in String form or null if not set. + */ + public String getCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId().toString(); + } + + /** + * Return the set CorrelationId value in the original form, if there are no properties + * in the given message return null. + * + * @return the set message ID in its original form or null if not set. + */ + public Object getRawCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId(); + } + + /** + * Sets the CorrelationId property on an outbound message using the provided value + * + * @param correlationId + * the correlation ID value to set. + */ + public void setRawCorrelationId(Object correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + /** * Sets the GroupId property on an outbound message using the provided String * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java new file mode 100644 index 0000000000..b155060498 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.JMSInteroperabilityTest; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests that the AMQP CorrelationId value and type are preserved. + */ +@RunWith(Parameterized.class) +public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class); + + private final String transformer; + + @Parameters(name="Transformer->{0}") + public static Collection