ARTEMIS-4657 support better correlation ID compat b/w JMS clients

This commit is contained in:
Justin Bertram 2024-02-27 11:25:51 -06:00 committed by Robbie Gemmell
parent 2e17a4a007
commit 50fae08b09
10 changed files with 306 additions and 33 deletions

View File

@ -21,6 +21,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;

View File

@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.SimpleType;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
@ -80,7 +81,6 @@ import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
@ -1562,7 +1562,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return getAMQPUserID();
case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
if (properties != null && properties.getCorrelationId() != null) {
return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
return AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());
}
return null;
default:

View File

@ -113,7 +113,7 @@ public class AMQPMessageIdHelper {
}
}
public String toCorrelationIdString(Object idObject) {
public Object toCorrelationIdStringOrBytes(Object idObject) {
if (idObject instanceof String) {
final String stringId = (String) idObject;
@ -130,6 +130,11 @@ public class AMQPMessageIdHelper {
// It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
return stringId;
}
} else if (idObject instanceof Binary) {
ByteBuffer dup = ((Binary) idObject).asByteBuffer();
byte[] bytes = new byte[dup.remaining()];
dup.get(bytes);
return bytes;
} else {
// Not a string, convert it
return convertToIdString(idObject);

View File

@ -378,7 +378,7 @@ public class AmqpCoreConverter {
if (correlationID != null) {
try {
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID));
jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(correlationID));
} catch (IllegalArgumentException e) {
jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID));
}

View File

@ -159,6 +159,8 @@ public class CoreAmqpConverter {
} catch (ActiveMQAMQPIllegalStateException e) {
properties.setCorrelationId(correlationID);
}
} else if (correlationID instanceof byte[]) {
properties.setCorrelationId(new Binary(((byte[])correlationID)));
} else {
properties.setCorrelationId(correlationID);
}

View File

@ -18,6 +18,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.message;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -320,22 +321,22 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns null if given null
*/
@Test
public void testToCorrelationIdStringWithNull() {
assertNull("null string should have been returned", messageIdHelper.toCorrelationIdString(null));
assertNull("null string should have been returned", messageIdHelper.toCorrelationIdStringOrBytes(null));
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} throws
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} throws
* an IAE if given an unexpected object type.
*/
@Test
public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() {
try {
messageIdHelper.toCorrelationIdString(new Object());
messageIdHelper.toCorrelationIdStringOrBytes(new Object());
fail("expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
@ -343,13 +344,19 @@ public class AMQPMessageIdHelperTest {
}
private void doToCorrelationIDTestImpl(Object idObject, String expected) {
String idString = messageIdHelper.toCorrelationIdString(idObject);
String idString = (String) messageIdHelper.toCorrelationIdStringOrBytes(idObject);
assertNotNull("null string should not have been returned", idString);
assertEquals("expected id string was not returned", expected, idString);
}
private void doToCorrelationIDBytesTestImpl(Object idObject, byte[] expected) {
byte[] idBytes = (byte[]) messageIdHelper.toCorrelationIdStringOrBytes(idObject);
assertNotNull("null byte[] should not have been returned", idBytes);
assertArrayEquals("expected id byte[] was not returned", expected, idBytes);
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns the given basic string unchanged when it has the "ID:" prefix (but
* no others).
*/
@ -361,7 +368,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns the given basic string unchanged when it lacks the "ID:" prefix
* (and any others)
*/
@ -373,7 +380,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
*/
@ -385,7 +392,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
*/
@ -397,7 +404,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
*/
@ -409,7 +416,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
*/
@ -421,7 +428,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string unchanged when it lacks the "ID:" prefix but happens to
* already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.
*/
@ -433,7 +440,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an AMQP encoded UUID when given a UUID object.
*/
@Test
@ -445,7 +452,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an AMQP encoded ulong when given a
* UnsignedLong object.
*/
@ -458,22 +465,27 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* returns a string indicating an AMQP encoded binary when given a Binary
* object.
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a byte[] when given a Binary object.
*/
@Test
public void testToCorrelationIdStringWithBinary() {
public void testToCorrelationIdByteArrayWithBinary() {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binary = new Binary(bytes);
String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF";
doToCorrelationIDBytesTestImpl(binary, bytes);
}
doToCorrelationIDTestImpl(binary, expected);
@Test
public void testToCorrelationIdByteArrayWithBinaryWithOffset() {
byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF};
Binary binary = new Binary(bytes, 2, 2);
doToCorrelationIDBytesTestImpl(binary, new byte[] {(byte) 0x09, (byte) 0xFF});
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}.
@ -487,7 +499,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}.
@ -501,7 +513,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}.
@ -515,7 +527,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}.
@ -529,7 +541,7 @@ public class AMQPMessageIdHelperTest {
}
/**
* Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)}
* Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)}
* returns a string indicating an escaped string, when given an input string
* that already has the "ID:" prefix, but follows it with an encoding prefix,
* in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}.

View File

@ -28,6 +28,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
@ -159,7 +162,7 @@ public final class OpenWireMessageConverter {
coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
coreMessage.setCorrelationID(corrId);
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
@ -590,9 +593,15 @@ public final class OpenWireMessageConverter {
}
amqMsg.setCommandId(commandId);
final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
final Object correlationID = coreMessage.getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
amqMsg.setCorrelationId(correlationID.toString());
} else if (correlationID instanceof byte[]) {
try {
amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap((byte[]) correlationID)).toString());
} catch (MalformedInputException e) {
ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage());
}
}
final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
@ -944,6 +953,8 @@ public final class OpenWireMessageConverter {
}
if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
continue;
} else if (s.equals(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY)) {
continue;
}
final Object prop = coreMessage.getObjectProperty(s);
try {

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
import java.nio.charset.StandardCharsets;
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@ -182,6 +185,58 @@ public class OpenWireMessageConverterTest {
assertEquals(PRODUCER_ID, messageDispatch.getMessage().getProducerId().toString());
}
@Test
public void testStringCorrelationId() throws Exception {
final String CORRELATION_ID = RandomUtil.randomString();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.setCorrelationID(CORRELATION_ID);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(CORRELATION_ID, messageDispatch.getMessage().getCorrelationId());
}
@Test
public void testBytesCorrelationId() throws Exception {
final byte[] CORRELATION_ID = RandomUtil.randomString().getBytes(StandardCharsets.UTF_8);
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.setCorrelationID(CORRELATION_ID);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(new String(CORRELATION_ID, StandardCharsets.UTF_8), messageDispatch.getMessage().getCorrelationId());
}
@Test
public void testInvalidUtf8BytesCorrelationId() throws Exception {
final byte[] CORRELATION_ID = new byte[]{1, (byte)0xFF, (byte)0xFF};
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.setCorrelationID(CORRELATION_ID);
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertNull(messageDispatch.getMessage().getCorrelationId());
}
@Test
public void testLegacyCorrelationId() throws Exception {
final String CORRELATION_ID = RandomUtil.randomString();
ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(CORRELATION_ID));
MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0);
assertEquals(CORRELATION_ID, messageDispatch.getMessage().getCorrelationId());
}
@Test
public void testMessageId() throws Exception {
final String MESSAGE_ID = "ID:123:456:789";

View File

@ -1608,4 +1608,7 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224135, value = "nodeID {} is closing. Topology update ignored", level = LogMessage.Level.INFO)
void nodeLeavingCluster(String nodeID);
@LogMessage(id = 224136, value = "Skipping correlation ID when converting message to OpenWire since byte[] value is not valid UTF-8: {}", level = LogMessage.Level.WARN)
void unableToDecodeCorrelationId(String message);
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
/*
* JMS supports setting the correlation ID as a String or a byte[]. However, OpenWire only supports correlation ID as
* a String. When it is set as a byte[] the OpenWire JMS client just converts it to a UTF-8 encoded String, and
* therefore when it sends a JMS message with a correlation ID the broker can't tell if the value was set as a String
* or a byte[]. Due to this ambiguity the broker is hard-coded to treat the incoming OpenWire value as a String. This
* doesn't cause any problems if the consumer is also OpenWire, but if the consumer is Core or AMQP (which both
* differentiate between String and binary values) then retrieving the correlation ID as a byte[] will fail and nothing
* can be done about it aside from updating the OpenWire protocol.
*
* Therefore, all the tests which involve the OpenWire JMS client using Message.setJMSCorrelationIDAsBytes() on a
* message sent to a different JMS implementation are ignored. The test are ignored rather that being completely
* removed to make clear this was an explicit decision not to test & support this use-case.
*/
public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
private void testCorrelationIDAsBytesSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1];
for (int i = 0; i <= 0xf; i++) {
bytes[i] = (byte) i;
}
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setJMSCorrelationIDAsBytes(bytes);
producer.send(message);
producer.close();
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
Message m = consumer.receive(5000);
Assert.assertNotNull("Could not receive message on consumer", m);
Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws Throwable {
testCorrelationIDAsBytesSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws Throwable {
testCorrelationIDAsBytesSendReceive(createConnection(), createCoreConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws Throwable {
testCorrelationIDAsBytesSendReceive(createConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws Throwable {
testCorrelationIDAsBytesSendReceive(createCoreConnection(), createCoreConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws Throwable {
testCorrelationIDAsBytesSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws Throwable {
testCorrelationIDAsBytesSendReceive(createCoreConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() throws Throwable {
testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
@Ignore
public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws Throwable {
testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createConnection());
}
@Test(timeout = 60000)
@Ignore
public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws Throwable {
testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createCoreConnection());
}
private void testCorrelationIDAsStringSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
final String correlationId = RandomUtil.randomString();
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setJMSCorrelationID(correlationId);
producer.send(message);
producer.close();
Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
Message m = consumer.receive(5000);
Assert.assertNotNull("Could not receive message on consumer", m);
Assert.assertEquals(correlationId, m.getJMSCorrelationID());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws Throwable {
testCorrelationIDAsStringSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws Throwable {
testCorrelationIDAsStringSendReceive(createConnection(), createCoreConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws Throwable {
testCorrelationIDAsStringSendReceive(createConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws Throwable {
testCorrelationIDAsStringSendReceive(createCoreConnection(), createCoreConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws Throwable {
testCorrelationIDAsStringSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws Throwable {
testCorrelationIDAsStringSendReceive(createCoreConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() throws Throwable {
testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createOpenWireConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws Throwable {
testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createConnection());
}
@Test(timeout = 60000)
public void testCorrelationIDAsStringSendReceiveFromOpenWireToCore() throws Throwable {
testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createCoreConnection());
}
}