From 16c487a7b9ba54c013254a1c642d31eba86acc8b Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 12 Jul 2016 14:18:47 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6353 Fix and test for encoding the correlation Id value to the ActiveMQ message object's string value --- .../amqp/message/AMQPMessageIdHelper.java | 1 - .../amqp/message/ActiveMQJMSVendor.java | 2 +- .../amqp/message/InboundTransformer.java | 2 +- .../JMSMappingOutboundTransformer.java | 9 +- .../transport/amqp/client/AmqpMessage.java | 54 ++++- .../AmqpCorrelationIdPreservationTest.java | 197 ++++++++++++++++++ 6 files changed, 259 insertions(+), 6 deletions(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java index dad365dedc..5afa9955de 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPMessageIdHelper.java @@ -46,7 +46,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong; * *

When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or * ulong but can't be converted into the indicated format, an exception will be thrown. - * */ public class AMQPMessageIdHelper { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java index a976240979..216daa9a34 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index 2223b5a18c..e883bcf955 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -207,7 +207,7 @@ public abstract class InboundTransformer { jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); } if (properties.getCorrelationId() != null) { - jms.setJMSCorrelationID(properties.getCorrelationId().toString()); + jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId())); } if (properties.getContentType() != null) { jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString()); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 7e6af2fed7..c9a94fa127 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -211,7 +211,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo())); } if (msg.getJMSCorrelationID() != null) { - props.setCorrelationId(msg.getJMSCorrelationID()); + String correlationId = msg.getJMSCorrelationID(); + try { + props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); + } catch (AmqpProtocolException e) { + props.setCorrelationId(correlationId); + } } if (msg.getJMSExpiration() != 0) { long ttl = msg.getJMSExpiration() - System.currentTimeMillis(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index d974690d67..b954e042ae 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -224,6 +224,58 @@ public class AmqpMessage { getWrappedMessage().setMessageId(messageId); } + /** + * Sets the CorrelationId property on an outbound message using the provided String + * + * @param correlationId + * the String Correlation ID value to set. + */ + public void setCorrelationId(String correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + + /** + * Return the set CorrelationId value in String form, if there are no properties + * in the given message return null. + * + * @return the set correlation ID in String form or null if not set. + */ + public String getCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId().toString(); + } + + /** + * Return the set CorrelationId value in the original form, if there are no properties + * in the given message return null. + * + * @return the set message ID in its original form or null if not set. + */ + public Object getRawCorrelationId() { + if (message.getProperties() == null) { + return null; + } + + return message.getProperties().getCorrelationId(); + } + + /** + * Sets the CorrelationId property on an outbound message using the provided value + * + * @param correlationId + * the correlation ID value to set. + */ + public void setRawCorrelationId(Object correlationId) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCorrelationId(correlationId); + } + /** * Sets the GroupId property on an outbound message using the provided String * diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java new file mode 100644 index 0000000000..b155060498 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpCorrelationIdPreservationTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Arrays; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.transport.amqp.JMSInteroperabilityTest; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.UnsignedLong; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests that the AMQP CorrelationId value and type are preserved. + */ +@RunWith(Parameterized.class) +public class AmqpCorrelationIdPreservationTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class); + + private final String transformer; + + @Parameters(name="Transformer->{0}") + public static Collection data() { + return Arrays.asList(new Object[][] { + {"jms"}, + {"native"}, + {"raw"}, + }); + } + + public AmqpCorrelationIdPreservationTest(String transformer) { + this.transformer = transformer; + } + + @Override + protected String getAmqpTransformer() { + return transformer; + } + + @Override + protected boolean isPersistent() { + return true; + } + + @Test(timeout = 60000) + public void testStringCorrelationIdIsPreserved() throws Exception { + doTestCorrelationIdPreservation("msg-id-string:1"); + } + + @Test(timeout = 60000) + public void testStringCorrelationIdIsPreservedAfterRestart() throws Exception { + doTestCorrelationIdPreservationOnBrokerRestart("msg-id-string:1"); + } + + @Test(timeout = 60000) + public void testUUIDCorrelationIdIsPreserved() throws Exception { + doTestCorrelationIdPreservation(UUID.randomUUID()); + } + + @Test(timeout = 60000) + public void testUUIDCorrelationIdIsPreservedAfterRestart() throws Exception { + doTestCorrelationIdPreservationOnBrokerRestart(UUID.randomUUID()); + } + + @Test(timeout = 60000) + public void testUnsignedLongCorrelationIdIsPreserved() throws Exception { + doTestCorrelationIdPreservation(new UnsignedLong(255l)); + } + + @Test(timeout = 60000) + public void testUnsignedLongCorrelationIdIsPreservedAfterRestart() throws Exception { + doTestCorrelationIdPreservationOnBrokerRestart(new UnsignedLong(255l)); + } + + @Test(timeout = 60000) + public void testBinaryLongCorrelationIdIsPreserved() throws Exception { + byte[] payload = new byte[32]; + for (int i = 0; i < 32; ++i) { + payload[i] = (byte) ('a' + i); + } + + doTestCorrelationIdPreservation(new Binary(payload)); + } + + @Test(timeout = 60000) + public void testBinaryLongCorrelationIdIsPreservedAfterRestart() throws Exception { + byte[] payload = new byte[32]; + for (int i = 0; i < 32; ++i) { + payload[i] = (byte) ('a' + i); + } + + doTestCorrelationIdPreservationOnBrokerRestart(new Binary(payload)); + } + + @Test(timeout = 60000) + public void testStringCorrelationIdPrefixIsPreserved() throws Exception { + doTestCorrelationIdPreservation("ID:msg-id-string:1"); + } + + public void doTestCorrelationIdPreservation(Object messageId) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + AmqpMessage message = new AmqpMessage(); + + message.setRawCorrelationId(messageId); + message.setText("Test-Message"); + + sender.send(message); + + sender.close(); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getQueueSize()); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have got a message", received); + assertEquals(received.getRawCorrelationId().getClass(), messageId.getClass()); + assertEquals(messageId, received.getRawCorrelationId()); + receiver.close(); + connection.close(); + } + + public void doTestCorrelationIdPreservationOnBrokerRestart(Object messageId) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender("queue://" + getTestName()); + + AmqpMessage message = new AmqpMessage(); + + message.setRawCorrelationId(messageId); + message.setText("Test-Message"); + message.setDurable(true); + + sender.send(message); + + sender.close(); + connection.close(); + + restartBroker(); + + QueueViewMBean queue = getProxyToQueue(getTestName()); + assertEquals(1, queue.getQueueSize()); + + connection = client.connect(); + session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver("queue://" + getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull("Should have got a message", received); + assertEquals(received.getRawCorrelationId().getClass(), messageId.getClass()); + assertEquals(messageId, received.getRawCorrelationId()); + receiver.close(); + connection.close(); + } +}