From 50fae08b09a76e200ef107d06cc867231f644ccd Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 27 Feb 2024 11:25:51 -0600 Subject: [PATCH] ARTEMIS-4657 support better correlation ID compat b/w JMS clients --- .../activemq/artemis/reader/MessageUtil.java | 1 + .../protocol/amqp/broker/AMQPMessage.java | 4 +- .../amqp/converter/AMQPMessageIdHelper.java | 7 +- .../amqp/converter/AmqpCoreConverter.java | 2 +- .../amqp/converter/CoreAmqpConverter.java | 2 + .../message/AMQPMessageIdHelperTest.java | 62 +++--- .../openwire/OpenWireMessageConverter.java | 19 +- .../OpenWireMessageConverterTest.java | 55 ++++++ .../core/server/ActiveMQServerLogger.java | 3 + .../multiprotocol/JMSCorrelationIDTest.java | 184 ++++++++++++++++++ 10 files changed, 306 insertions(+), 33 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java index 4e4d3e5928..780085f4fd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.Message; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 018d0a27e6..e10ae2b9e2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -20,6 +20,7 @@ import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.SimpleType; +import java.lang.invoke.MethodHandles; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -80,7 +81,6 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; @@ -1562,7 +1562,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return getAMQPUserID(); case MessageUtil.CORRELATIONID_HEADER_NAME_STRING: if (properties != null && properties.getCorrelationId() != null) { - return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()); + return AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId()); } return null; default: diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java index baf08aeefb..06f97f1d3a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java @@ -113,7 +113,7 @@ public class AMQPMessageIdHelper { } } - public String toCorrelationIdString(Object idObject) { + public Object toCorrelationIdStringOrBytes(Object idObject) { if (idObject instanceof String) { final String stringId = (String) idObject; @@ -130,6 +130,11 @@ public class AMQPMessageIdHelper { // It has "ID:" prefix and doesn't have encoding prefix, use it as-is. return stringId; } + } else if (idObject instanceof Binary) { + ByteBuffer dup = ((Binary) idObject).asByteBuffer(); + byte[] bytes = new byte[dup.remaining()]; + dup.get(bytes); + return bytes; } else { // Not a string, convert it return convertToIdString(idObject); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java index 24f84474c0..416c88fb6a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java @@ -378,7 +378,7 @@ public class AmqpCoreConverter { if (correlationID != null) { try { - jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(correlationID)); + jms.getInnerMessage().setCorrelationID(AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(correlationID)); } catch (IllegalArgumentException e) { jms.getInnerMessage().setCorrelationID(String.valueOf(correlationID)); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java index 2d1a5d4bd8..0fe67e2da2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java @@ -159,6 +159,8 @@ public class CoreAmqpConverter { } catch (ActiveMQAMQPIllegalStateException e) { properties.setCorrelationId(correlationID); } + } else if (correlationID instanceof byte[]) { + properties.setCorrelationId(new Binary(((byte[])correlationID))); } else { properties.setCorrelationId(correlationID); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java index bd7192bb93..166418077a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelperTest.java @@ -18,6 +18,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -320,22 +321,22 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns null if given null */ @Test public void testToCorrelationIdStringWithNull() { - assertNull("null string should have been returned", messageIdHelper.toCorrelationIdString(null)); + assertNull("null string should have been returned", messageIdHelper.toCorrelationIdStringOrBytes(null)); } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} throws + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} throws * an IAE if given an unexpected object type. */ @Test public void testToCorrelationIdStringThrowsIAEWithUnexpectedType() { try { - messageIdHelper.toCorrelationIdString(new Object()); + messageIdHelper.toCorrelationIdStringOrBytes(new Object()); fail("expected exception not thrown"); } catch (IllegalArgumentException iae) { // expected @@ -343,13 +344,19 @@ public class AMQPMessageIdHelperTest { } private void doToCorrelationIDTestImpl(Object idObject, String expected) { - String idString = messageIdHelper.toCorrelationIdString(idObject); + String idString = (String) messageIdHelper.toCorrelationIdStringOrBytes(idObject); assertNotNull("null string should not have been returned", idString); assertEquals("expected id string was not returned", expected, idString); } + private void doToCorrelationIDBytesTestImpl(Object idObject, byte[] expected) { + byte[] idBytes = (byte[]) messageIdHelper.toCorrelationIdStringOrBytes(idObject); + assertNotNull("null byte[] should not have been returned", idBytes); + assertArrayEquals("expected id byte[] was not returned", expected, idBytes); + } + /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns the given basic string unchanged when it has the "ID:" prefix (but * no others). */ @@ -361,7 +368,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns the given basic string unchanged when it lacks the "ID:" prefix * (and any others) */ @@ -373,7 +380,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string unchanged when it lacks the "ID:" prefix but happens to * already begin with the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}. */ @@ -385,7 +392,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string unchanged when it lacks the "ID:" prefix but happens to * already begin with the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}. */ @@ -397,7 +404,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string unchanged when it lacks the "ID:" prefix but happens to * already begin with the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}. */ @@ -409,7 +416,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string unchanged when it lacks the "ID:" prefix but happens to * already begin with the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}. */ @@ -421,7 +428,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string unchanged when it lacks the "ID:" prefix but happens to * already begin with the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}. */ @@ -433,7 +440,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an AMQP encoded UUID when given a UUID object. */ @Test @@ -445,7 +452,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an AMQP encoded ulong when given a * UnsignedLong object. */ @@ -458,22 +465,27 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} - * returns a string indicating an AMQP encoded binary when given a Binary - * object. + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} + * returns a byte[] when given a Binary object. */ @Test - public void testToCorrelationIdStringWithBinary() { + public void testToCorrelationIdByteArrayWithBinary() { byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF}; Binary binary = new Binary(bytes); - String expected = AMQPMessageIdHelper.JMS_ID_PREFIX + AMQPMessageIdHelper.AMQP_BINARY_PREFIX + "00AB09FF"; + doToCorrelationIDBytesTestImpl(binary, bytes); + } - doToCorrelationIDTestImpl(binary, expected); + @Test + public void testToCorrelationIdByteArrayWithBinaryWithOffset() { + byte[] bytes = new byte[] {(byte) 0x00, (byte) 0xAB, (byte) 0x09, (byte) 0xFF}; + Binary binary = new Binary(bytes, 2, 2); + + doToCorrelationIDBytesTestImpl(binary, new byte[] {(byte) 0x09, (byte) 0xFF}); } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an escaped string, when given an input string * that already has the "ID:" prefix, but follows it with an encoding prefix, * in this case the {@link AMQPMessageIdHelper#AMQP_STRING_PREFIX}. @@ -487,7 +499,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an escaped string, when given an input string * that already has the "ID:" prefix, but follows it with an encoding prefix, * in this case the {@link AMQPMessageIdHelper#AMQP_UUID_PREFIX}. @@ -501,7 +513,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an escaped string, when given an input string * that already has the "ID:" prefix, but follows it with an encoding prefix, * in this case the {@link AMQPMessageIdHelper#AMQP_ULONG_PREFIX}. @@ -515,7 +527,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an escaped string, when given an input string * that already has the "ID:" prefix, but follows it with an encoding prefix, * in this case the {@link AMQPMessageIdHelper#AMQP_BINARY_PREFIX}. @@ -529,7 +541,7 @@ public class AMQPMessageIdHelperTest { } /** - * Test that {@link AMQPMessageIdHelper#toCorrelationIdString(Object)} + * Test that {@link AMQPMessageIdHelper#toCorrelationIdStringOrBytes(Object)} * returns a string indicating an escaped string, when given an input string * that already has the "ID:" prefix, but follows it with an encoding prefix, * in this case the {@link AMQPMessageIdHelper#AMQP_NO_PREFIX}. diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index e6d69757d3..617f076af9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -28,6 +28,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.nio.charset.MalformedInputException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; import java.util.Map.Entry; @@ -159,7 +162,7 @@ public final class OpenWireMessageConverter { coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); final String corrId = messageSend.getCorrelationId(); if (corrId != null) { - coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); + coreMessage.setCorrelationID(corrId); } final DataStructure ds = messageSend.getDataStructure(); if (ds != null) { @@ -590,9 +593,15 @@ public final class OpenWireMessageConverter { } amqMsg.setCommandId(commandId); - final SimpleString corrId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.JMS_CORRELATION_ID_PROPERTY); - if (corrId != null) { - amqMsg.setCorrelationId(corrId.toString()); + final Object correlationID = coreMessage.getCorrelationID(); + if (correlationID instanceof String || correlationID instanceof SimpleString) { + amqMsg.setCorrelationId(correlationID.toString()); + } else if (correlationID instanceof byte[]) { + try { + amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap((byte[]) correlationID)).toString()); + } catch (MalformedInputException e) { + ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage()); + } } final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE); @@ -944,6 +953,8 @@ public final class OpenWireMessageConverter { } if (!coreMessage.containsProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) { continue; + } else if (s.equals(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY)) { + continue; } final Object prop = coreMessage.getObjectProperty(s); try { diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java index 0580f5cad0..0624bc35f4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java +++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java @@ -16,10 +16,13 @@ */ package org.apache.activemq.artemis.core.protocol.openwire; +import java.nio.charset.StandardCharsets; + import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; import org.apache.activemq.artemis.core.server.MessageReference; @@ -182,6 +185,58 @@ public class OpenWireMessageConverterTest { assertEquals(PRODUCER_ID, messageDispatch.getMessage().getProducerId().toString()); } + @Test + public void testStringCorrelationId() throws Exception { + final String CORRELATION_ID = RandomUtil.randomString(); + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.setCorrelationID(CORRELATION_ID); + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); + assertEquals(CORRELATION_ID, messageDispatch.getMessage().getCorrelationId()); + } + + @Test + public void testBytesCorrelationId() throws Exception { + final byte[] CORRELATION_ID = RandomUtil.randomString().getBytes(StandardCharsets.UTF_8); + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.setCorrelationID(CORRELATION_ID); + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); + assertEquals(new String(CORRELATION_ID, StandardCharsets.UTF_8), messageDispatch.getMessage().getCorrelationId()); + } + + @Test + public void testInvalidUtf8BytesCorrelationId() throws Exception { + final byte[] CORRELATION_ID = new byte[]{1, (byte)0xFF, (byte)0xFF}; + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.setCorrelationID(CORRELATION_ID); + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); + assertNull(messageDispatch.getMessage().getCorrelationId()); + } + + @Test + public void testLegacyCorrelationId() throws Exception { + final String CORRELATION_ID = RandomUtil.randomString(); + + ICoreMessage coreMessage = new CoreMessage().initBuffer(8); + coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(CORRELATION_ID)); + MessageReference messageReference = new MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class)); + AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class); + Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination); + MessageDispatch messageDispatch = OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, openWireFormat, amqConsumer, nodeUUID, 0); + assertEquals(CORRELATION_ID, messageDispatch.getMessage().getCorrelationId()); + } + @Test public void testMessageId() throws Exception { final String MESSAGE_ID = "ID:123:456:789"; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index d3320883e2..2fdad78462 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1608,4 +1608,7 @@ public interface ActiveMQServerLogger { @LogMessage(id = 224135, value = "nodeID {} is closing. Topology update ignored", level = LogMessage.Level.INFO) void nodeLeavingCluster(String nodeID); + + @LogMessage(id = 224136, value = "Skipping correlation ID when converting message to OpenWire since byte[] value is not valid UTF-8: {}", level = LogMessage.Level.WARN) + void unableToDecodeCorrelationId(String message); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java new file mode 100644 index 0000000000..31250693aa --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +/* + * JMS supports setting the correlation ID as a String or a byte[]. However, OpenWire only supports correlation ID as + * a String. When it is set as a byte[] the OpenWire JMS client just converts it to a UTF-8 encoded String, and + * therefore when it sends a JMS message with a correlation ID the broker can't tell if the value was set as a String + * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the incoming OpenWire value as a String. This + * doesn't cause any problems if the consumer is also OpenWire, but if the consumer is Core or AMQP (which both + * differentiate between String and binary values) then retrieving the correlation ID as a byte[] will fail and nothing + * can be done about it aside from updating the OpenWire protocol. + * + * Therefore, all the tests which involve the OpenWire JMS client using Message.setJMSCorrelationIDAsBytes() on a + * message sent to a different JMS implementation are ignored. The test are ignored rather that being completely + * removed to make clear this was an explicit decision not to test & support this use-case. + */ +public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport { + + private void testCorrelationIDAsBytesSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + byte[] bytes = new byte[0xf + 1]; + for (int i = 0; i <= 0xf; i++) { + bytes[i] = (byte) i; + } + + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setJMSCorrelationIDAsBytes(bytes); + producer.send(message); + producer.close(); + + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); + + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message on consumer", m); + + Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + @Ignore + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createConnection()); + } + + @Test(timeout = 60000) + @Ignore + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createCoreConnection()); + } + + private void testCorrelationIDAsStringSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + final String correlationId = RandomUtil.randomString(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setJMSCorrelationID(correlationId); + producer.send(message); + producer.close(); + + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); + + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message on consumer", m); + + Assert.assertEquals(correlationId, m.getJMSCorrelationID()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromOpenWireToCore() throws Throwable { + testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createCoreConnection()); + } +} \ No newline at end of file