From 3f9de5fa304b828494a4b1ba1f380fe269d91055 Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Mon, 1 Nov 2021 20:00:15 +0000 Subject: [PATCH] ARTEMIS-3461: add some tests and resolve various issues spotted with the prior changes - Avoid blowing up on string bodies of any size if the valueSizeLimit bits are configured to disable limit - Dont NPE if amqp-value + binary body is sent without a content-type, as it always should be. - Include expected prefix when adding delivery delay and ingress time annotations. - Use the actual name for ingress time annotation, as with all other annotations. - Use correct object type when testing equality with content-type value. - Use consistent case for 'groupId' in different properties. --- .../activemq/artemis/api/core/JsonUtil.java | 2 +- .../artemis/api/core/JsonUtilTest.java | 43 +- .../protocol/amqp/broker/AMQPMessage.java | 22 +- .../amqp/converter/AMQPMessageSupport.java | 10 +- .../protocol/amqp/broker/AMQPMessageTest.java | 410 +++++++++++++++++- 5 files changed, 468 insertions(+), 19 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index 69dfa8af03..b1535bb3e6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -326,7 +326,7 @@ public final class JsonUtil { } public static String truncateString(final String str, final int valueSizeLimit) { - if (str.length() > valueSizeLimit) { + if (valueSizeLimit >= 0 && str.length() > valueSizeLimit) { return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); } else { return str; diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java index f6740ddd07..ab87cf2304 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/api/core/JsonUtilTest.java @@ -69,7 +69,8 @@ public class JsonUtilTest { Assert.assertEquals(6, jsonObject.getJsonArray("byteArray").size()); } - @Test public void testAddByteArrayToJsonArray() { + @Test + public void testAddByteArrayToJsonArray() { JsonArrayBuilder jsonArrayBuilder = JsonLoader.createArrayBuilder(); byte[] bytes = {0x0a, 0x1b, 0x2c, 0x3d, 0x4e, 0x5f}; @@ -79,4 +80,44 @@ public class JsonUtilTest { Assert.assertEquals(1, jsonArray.size()); } + + @Test + public void testTruncateUsingStringWithValueSizeLimit() { + String prefix = "12345"; + int valueSizeLimit = prefix.length(); + String remaining = "remaining"; + + String truncated = (String) JsonUtil.truncate(prefix + remaining, valueSizeLimit); + + String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more"; + Assert.assertEquals(expected, truncated); + } + + @Test + public void testTruncateUsingStringWithoutValueSizeLimit() { + String input = "testTruncateUsingStringWithoutValueSizeLimit"; + String notTruncated = (String) JsonUtil.truncate(input, -1); + + Assert.assertEquals(input, notTruncated); + } + + @Test + public void testTruncateStringWithValueSizeLimit() { + String prefix = "12345"; + int valueSizeLimit = prefix.length(); + String remaining = "remaining"; + + String truncated = JsonUtil.truncateString(prefix + remaining, valueSizeLimit); + + String expected = prefix + ", + " + String.valueOf(remaining.length()) + " more"; + Assert.assertEquals(expected, truncated); + } + + @Test + public void testTruncateStringWithoutValueSizeLimit() { + String input = "testTruncateStringWithoutValueSizeLimit"; + String notTruncated = JsonUtil.truncateString(input, -1); + + Assert.assertEquals(input, notTruncated); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 64da6ebc25..10904da662 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -881,7 +881,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString()); } if (properties.getGroupId() != null) { - map.put(propertiesPrefix + "groupID", properties.getGroupId()); + map.put(propertiesPrefix + "groupId", properties.getGroupId()); } if (properties.getGroupSequence() != null) { map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue()); @@ -916,9 +916,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. map.put(prefix + "x-opt-delivery-time", deliveryTime); } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { long delay = ((Number) entry.getValue()).longValue(); - map.put("x-opt-delivery-delay", delay); + map.put(prefix + "x-opt-delivery-delay", delay); } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { - map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); + map.put(prefix + AMQPMessageSupport.X_OPT_INGRESS_TIME, ((Number) entry.getValue()).longValue()); } else { try { map.put(prefix + key, entry.getValue()); @@ -1878,30 +1878,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } final Symbol contentType = properties != null ? properties.getContentType() : null; - final String contentTypeString = contentType != null ? contentType.toString() : null; if (m.getBody() instanceof Data && contentType != null) { - - if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) { + if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) { type = OBJECT_TYPE; - } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) { + } else if (AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE_SYMBOL.equals(contentType)) { type = BYTES_TYPE; } else { - Charset charset = getCharsetForTextualContent(contentTypeString); + Charset charset = getCharsetForTextualContent(contentType.toString()); if (StandardCharsets.UTF_8.equals(charset)) { type = TEXT_TYPE; } } - } else if (m.getBody() instanceof AmqpSequence) { - type = STREAM_TYPE; } else if (m.getBody() instanceof AmqpValue) { Object value = ((AmqpValue) m.getBody()).getValue(); if (value instanceof String) { type = TEXT_TYPE; } else if (value instanceof Binary) { - - if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) { + if (AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.equals(contentType)) { type = OBJECT_TYPE; } else { type = BYTES_TYPE; @@ -1911,7 +1906,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } else if (value instanceof Map) { type = MAP_TYPE; } + } else if (m.getBody() instanceof AmqpSequence) { + type = STREAM_TYPE; } + return type; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java index c8e1eba1aa..1b47944269 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java @@ -66,6 +66,8 @@ public final class AMQPMessageSupport { public static SimpleString HDR_ORIGINAL_ADDRESS_ANNOTATION = SimpleString.toSimpleString("x-opt-ORIG-ADDRESS"); public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to"; + public static final String X_OPT_DELIVERY_TIME = "x-opt-delivery-time"; + public static final String X_OPT_DELIVERY_DELAY = "x-opt-delivery-delay"; // Message Properties used to map AMQP to JMS and back /** @@ -85,12 +87,12 @@ public final class AMQPMessageSupport { /** * Attribute used to mark the Application defined delivery time assigned to the message */ - public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol("x-opt-delivery-time"); + public static final Symbol SCHEDULED_DELIVERY_TIME = Symbol.getSymbol(X_OPT_DELIVERY_TIME); /** * Attribute used to mark the Application defined delivery time assigned to the message */ - public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol("x-opt-delivery-delay"); + public static final Symbol SCHEDULED_DELIVERY_DELAY = Symbol.getSymbol(X_OPT_DELIVERY_DELAY); /** * Attribute used to mark the Application defined delivery time assigned to the message @@ -204,10 +206,12 @@ public final class AMQPMessageSupport { public static final byte TEMP_QUEUE_TYPE = 0x02; public static final byte TEMP_TOPIC_TYPE = 0x03; + public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + /** * Content type used to mark Data sections as containing arbitrary bytes. */ - public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; + public static final Symbol OCTET_STREAM_CONTENT_TYPE_SYMBOL = Symbol.valueOf(OCTET_STREAM_CONTENT_TYPE); /** * Lookup and return the correct Proton Symbol instance based on the given key. diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 3eb4d239a4..8a450ec0a9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -39,12 +39,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; + import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageIdHelper; import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; @@ -54,6 +59,7 @@ import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; @@ -86,12 +92,26 @@ public class AMQPMessageTest { private static final String TEST_MESSAGE_ANNOTATION_KEY = "x-opt-test-annotation"; private static final String TEST_MESSAGE_ANNOTATION_VALUE = "test-annotation"; + private static final String TEST_MESSAGE_ANNOTATION_KEY2 = "x-opt-test-annotation2"; + private static final String TEST_MESSAGE_ANNOTATION_VALUE2 = "test-annotation2"; + + private static final String TEST_EXTRA_PROPERTY_KEY1 = "extraPropertyKey1"; + private static final String TEST_EXTRA_PROPERTY_VALUE1 = "extraPropertyValue1"; + private static final String TEST_EXTRA_PROPERTY_KEY2 = "extraPropertyKey2"; + private static final String TEST_EXTRA_PROPERTY_VALUE2 = "extraPropertyValue2"; private static final String TEST_APPLICATION_PROPERTY_KEY = "key-1"; private static final String TEST_APPLICATION_PROPERTY_VALUE = "value-1"; + private static final String TEST_APPLICATION_PROPERTY_KEY2 = "key-2"; + private static final String TEST_APPLICATION_PROPERTY_VALUE2 = "value-2"; private static final String TEST_STRING_BODY = "test-string-body"; + private static final String PROPERTY_MAP_APP_PROPERTIES_PREFIX = "applicationProperties."; + private static final String PROPERTY_MAP_PROPERTIES_PREFIX = "properties."; + private static final String PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX = "messageAnnotations."; + private static final String PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX = "extraProperties."; + private byte[] encodedProtonMessage; @Before @@ -2120,6 +2140,388 @@ public class AMQPMessageTest { assertEquals(annotationValue2, msgFromSendBuffer2.getDeliveryAnnotations().getValue().get(Symbol.getSymbol(annotationKey2))); } + //----- CompositeData handling -------------------------------------------// + + @Test + public void testToCompositeDataHeaderSectionDurable() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + // With section missing (defaults false) + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + CompositeData cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.DURABLE)); + Object durableObj = cd.get(CompositeDataConstants.DURABLE); + assertTrue(durableObj instanceof Boolean); + + assertEquals(Boolean.FALSE, durableObj); + + // With section present, but value not set (defaults false) + Header protonHeader = new Header(); + protonMessage.setHeader(protonHeader); + + decoded = encodeAndDecodeMessage(protonMessage); + cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.DURABLE)); + durableObj = cd.get(CompositeDataConstants.DURABLE); + assertTrue(durableObj instanceof Boolean); + + assertEquals(Boolean.FALSE, durableObj); + + // With section present, value set False explicitly + protonHeader = new Header(); + protonHeader.setDurable(Boolean.FALSE); + protonMessage.setHeader(protonHeader); + + decoded = encodeAndDecodeMessage(protonMessage); + cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.DURABLE)); + durableObj = cd.get(CompositeDataConstants.DURABLE); + assertTrue(durableObj instanceof Boolean); + + assertEquals(Boolean.FALSE, durableObj); + + // With section present, value set True explicitly + protonHeader = new Header(); + protonHeader.setDurable(Boolean.TRUE); + protonMessage.setHeader(protonHeader); + + decoded = encodeAndDecodeMessage(protonMessage); + cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.DURABLE)); + durableObj = cd.get(CompositeDataConstants.DURABLE); + assertTrue(durableObj instanceof Boolean); + + assertEquals(Boolean.TRUE, durableObj); + } + + @Test + public void testToCompositeDataHeaderSectionPriority() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + // With section missing (defaults 4) + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + CompositeData cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY)); + Object priorityObj = cd.get(CompositeDataConstants.PRIORITY); + assertTrue(priorityObj instanceof Byte); + + assertEquals(Byte.valueOf((byte) 4), priorityObj); + + // With section present, but value not set (defaults 4) + Header protonHeader = new Header(); + protonMessage.setHeader(protonHeader); + + decoded = encodeAndDecodeMessage(protonMessage); + cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY)); + priorityObj = cd.get(CompositeDataConstants.PRIORITY); + assertTrue(priorityObj instanceof Byte); + + assertEquals(Byte.valueOf((byte) 4), priorityObj); + + // With section present, value set to 5 explicitly + protonHeader = new Header(); + protonHeader.setPriority(UnsignedByte.valueOf((byte) 5)); + protonMessage.setHeader(protonHeader); + + decoded = encodeAndDecodeMessage(protonMessage); + cd = decoded.toCompositeData(0, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PRIORITY)); + priorityObj = cd.get(CompositeDataConstants.PRIORITY); + assertTrue(priorityObj instanceof Byte); + + assertEquals(Byte.valueOf((byte) 5), priorityObj); + } + + @Test + public void testToCompositeDataPropertiesSection() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + String testContentEncoding = "gzip"; + String testGroupId = "testGroupId"; + int testGroupSequence = 45678; + String testReplyToGroupId = "testReplyToGroupId"; + long testCreationTime = System.currentTimeMillis(); + long testExpiryTime = testCreationTime + 5000; + String testSubject = "testSubject"; + String testMessageId = "testMessageId"; + + Properties protonProperties = new Properties(); + protonProperties.setContentType(Symbol.valueOf(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)); + protonProperties.setContentEncoding(Symbol.valueOf(testContentEncoding)); + protonProperties.setGroupId(testGroupId); + protonProperties.setGroupSequence(UnsignedInteger.valueOf(testGroupSequence)); + protonProperties.setReplyToGroupId(testReplyToGroupId); + protonProperties.setCreationTime(new Date(testCreationTime)); + protonProperties.setAbsoluteExpiryTime(new Date(testExpiryTime)); + protonProperties.setSubject(testSubject); + protonProperties.setTo(TEST_TO_ADDRESS); + protonProperties.setMessageId(testMessageId); + + protonMessage.setProperties(protonProperties); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES)); + Object propsObject = cd.get(CompositeDataConstants.PROPERTIES); + assertTrue(propsObject instanceof String); + String properties = (String) propsObject; + + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentType" + "=" + AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "contentEncoding" + "=" + testContentEncoding)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupId" + "=" + testGroupId)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "groupSequence" + "=" + testGroupSequence)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "replyToGroupId" + "=" + testReplyToGroupId)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "creationTime" + "=" + testCreationTime)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "absoluteExpiryTime" + "=" + testExpiryTime)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "to" + "=" + TEST_TO_ADDRESS)); + assertTrue(properties.contains(PROPERTY_MAP_PROPERTIES_PREFIX + "subject" + "=" + testSubject)); + + // TODO: should these fields be included in the 'properties' string and tested above? + // Some are shown elsewhere in a way, others missing entirely. Eg: + // + // correlation-id: not included. + // message-id: included'ish, with an ID: prefix added, as the CompositeDataConstants.USER_ID. + // reply-to: not included, though the replyToGroupId is given as shown above. + // user-id: not included. + + // Some fields of the properties section already align with fields given + // their own top level entries of the CompositeData, which remain: + + // The message-id is presented via the 'user id' field, inc an added prefix. + assertTrue(cd.containsKey(CompositeDataConstants.USER_ID)); + Object messageIdObj = cd.get(CompositeDataConstants.USER_ID); + assertTrue(messageIdObj instanceof String); + + assertEquals(AMQPMessageIdHelper.JMS_ID_PREFIX + testMessageId, messageIdObj); + + // The creation-time is duplicated as the 'timestamp' field + assertTrue(cd.containsKey(CompositeDataConstants.TIMESTAMP)); + Object timestampObj = cd.get(CompositeDataConstants.TIMESTAMP); + assertTrue(timestampObj instanceof Long); + + assertEquals(testCreationTime, timestampObj); + } + + @Test + public void testToCompositeDataApplicationPropertiesSection() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + Map appPropsMap = new HashMap<>(); + appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY, TEST_APPLICATION_PROPERTY_VALUE); + appPropsMap.put(TEST_APPLICATION_PROPERTY_KEY2, TEST_APPLICATION_PROPERTY_VALUE2); + ApplicationProperties appProps = new ApplicationProperties(appPropsMap); + + protonMessage.setApplicationProperties(appProps); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES)); + Object propsObject = cd.get(CompositeDataConstants.PROPERTIES); + assertTrue(propsObject instanceof String); + String properties = (String) propsObject; + + assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX + + TEST_APPLICATION_PROPERTY_KEY + "=" + TEST_APPLICATION_PROPERTY_VALUE)); + assertTrue(properties.contains(PROPERTY_MAP_APP_PROPERTIES_PREFIX + + TEST_APPLICATION_PROPERTY_KEY2 + "=" + TEST_APPLICATION_PROPERTY_VALUE2)); + } + + @Test + public void testToCompositeDataMessageAnnotationSection() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + Map annotationsMap = new HashMap<>(); + annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY), TEST_MESSAGE_ANNOTATION_VALUE); + annotationsMap.put(Symbol.valueOf(TEST_MESSAGE_ANNOTATION_KEY2), TEST_MESSAGE_ANNOTATION_VALUE2); + MessageAnnotations annotations = new MessageAnnotations(annotationsMap); + + protonMessage.setMessageAnnotations(annotations); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES)); + Object propsObject = cd.get(CompositeDataConstants.PROPERTIES); + assertTrue(propsObject instanceof String); + String properties = (String) propsObject; + + assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX + + TEST_MESSAGE_ANNOTATION_KEY + "=" + TEST_MESSAGE_ANNOTATION_VALUE)); + assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX + + TEST_MESSAGE_ANNOTATION_KEY2 + "=" + TEST_MESSAGE_ANNOTATION_VALUE2)); + + // Now try some specific annotations with their own handling + long testIngressTime = System.currentTimeMillis(); + long testDeliveryTime = System.currentTimeMillis() + 5678; + long testDeliveryDelay = 6789; + + annotationsMap = new HashMap<>(); + annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_INGRESS_TIME), testIngressTime); + annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_TIME), testDeliveryTime); + annotationsMap.put(Symbol.valueOf(AMQPMessageSupport.X_OPT_DELIVERY_DELAY), testDeliveryDelay); + annotations = new MessageAnnotations(annotationsMap); + protonMessage.setMessageAnnotations(annotations); + + decoded = encodeAndDecodeMessage(protonMessage); + + cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES)); + propsObject = cd.get(CompositeDataConstants.PROPERTIES); + assertTrue(propsObject instanceof String); + properties = (String) propsObject; + + assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX + + AMQPMessageSupport.X_OPT_INGRESS_TIME + "=" + testIngressTime)); + assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX + + AMQPMessageSupport.X_OPT_DELIVERY_TIME + "=" + testDeliveryTime)); + assertTrue(properties.contains(PROPERTY_MAP_MESSAGE_ANNOTATIONS_PREFIX + + AMQPMessageSupport.X_OPT_DELIVERY_DELAY + "=" + testDeliveryDelay)); + } + + @Test + public void testToCompositeDataExtraProperties() throws Exception { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + TypedProperties extraProperties = new TypedProperties(); + extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY1), TEST_EXTRA_PROPERTY_VALUE1); + extraProperties.putProperty(new SimpleString(TEST_EXTRA_PROPERTY_KEY2), TEST_EXTRA_PROPERTY_VALUE2); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage, extraProperties); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.PROPERTIES)); + Object propsObject = cd.get(CompositeDataConstants.PROPERTIES); + assertTrue(propsObject instanceof String); + String properties = (String) propsObject; + + assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX + + TEST_EXTRA_PROPERTY_KEY1 + "=" + TEST_EXTRA_PROPERTY_VALUE1)); + assertTrue(properties.contains(PROPERTY_MAP_EXTRA_PROPERTIES_PREFIX + + TEST_EXTRA_PROPERTY_KEY2 + "=" + TEST_EXTRA_PROPERTY_VALUE2)); + } + + @Test + public void testToCompositeDataWithDataBodySectionWithoutContentType() throws Exception { + doToCompositeDataWithDataBodySectionTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE); + } + + @Test + public void testToCompositeDataWithDataBodySectionWithOctetStreamContentType() throws Exception { + doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE); + } + + @Test + public void testToCompositeDataWithDataBodySectionWithTextPlainContentType() throws Exception { + doToCompositeDataWithDataBodySectionTestImpl("text/plain", org.apache.activemq.artemis.api.core.Message.TEXT_TYPE); + } + + @Test + public void testToCompositeDataWithDataBodySectionWithSerializedObjectContentType() throws Exception { + doToCompositeDataWithDataBodySectionTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE); + } + + private void doToCompositeDataWithDataBodySectionTestImpl(String contentType, byte expectedMessageType) throws OpenDataException { + Message protonMessage = Message.Factory.create(); + + // Not the right payload for some of the content types,but it + // doesnt matter, mainly checking type value and for lack of NPEs. + String bytesSource = "testPayloadBytes"; + String expectedBodyText = "Data{" + bytesSource + "}"; + Data body = new Data(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8))); + + protonMessage.setBody(body); + protonMessage.setContentType(contentType); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY)); + assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY)); + + assertTrue(cd.containsKey(CompositeDataConstants.TYPE)); + assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE)); + } + + @Test + public void testToCompositeDataWithAmqpValueBinaryBodySectionWithoutContentType() throws Exception { + doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE); + } + + @Test + public void testToCompositeDataWithAmqpValueBinaryBodySectionWithSerializedObjectContentType() throws Exception { + // Shouldnt really get in this situation, not meant to use content-type without the Data body section. + doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE); + } + + private void doToCompositeDataWithAmqpValueBodySectionWithBinaryTestImpl(String contentType, byte expectedMessageType) throws OpenDataException { + Message protonMessage = Message.Factory.create(); + + // Not the right payload for some of the content types,but it + // doesnt matter, mainly checking type value and for lack of NPEs. + String bytesSource = "testPayloadBytes"; + AmqpValue body = new AmqpValue(new Binary(bytesSource.getBytes(StandardCharsets.UTF_8))); + + protonMessage.setBody(body); + protonMessage.setContentType(contentType); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(-1, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY)); + assertEquals(bytesSource, cd.get(CompositeDataConstants.TEXT_BODY)); + + assertTrue(cd.containsKey(CompositeDataConstants.TYPE)); + assertEquals(expectedMessageType, cd.get(CompositeDataConstants.TYPE)); + } + + @Test + public void testToCompositeDataWithStringBodyWithoutValueSizeLimit() throws Exception { + doToCompositeDataWithStringBodyValueSizeLimitTestImpl(-1, TEST_STRING_BODY); + } + + @Test + public void testToCompositeDataWithStringBodyWithValueSizeLimit() throws Exception { + int limit = 11; + int testBodyLength = TEST_STRING_BODY.length(); + assertTrue(testBodyLength > limit); + + String expected = TEST_STRING_BODY.substring(0, limit) + ", + " + String.valueOf(testBodyLength - limit) + " more"; + + doToCompositeDataWithStringBodyValueSizeLimitTestImpl(limit, expected); + } + + private void doToCompositeDataWithStringBodyValueSizeLimitTestImpl(int fieldsLimit, String expectedBodyText) throws OpenDataException { + Message protonMessage = Message.Factory.create(); + protonMessage.setBody(new AmqpValue(TEST_STRING_BODY)); + + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + CompositeData cd = decoded.toCompositeData(fieldsLimit, 0); + + assertTrue(cd.containsKey(CompositeDataConstants.TEXT_BODY)); + assertEquals(expectedBodyText, cd.get(CompositeDataConstants.TEXT_BODY)); + + assertTrue(cd.containsKey(CompositeDataConstants.TYPE)); + assertEquals(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, cd.get(CompositeDataConstants.TYPE)); + } + //----- Test Support ------------------------------------------------------// private MessageImpl createProtonMessage() { @@ -2483,13 +2885,17 @@ public class AMQPMessageTest { return bytes; } - private AMQPStandardMessage encodeAndDecodeMessage(MessageImpl message) { + private AMQPStandardMessage encodeAndDecodeMessage(Message message) { + return encodeAndDecodeMessage(message, null); + } + + private AMQPStandardMessage encodeAndDecodeMessage(Message message, TypedProperties extraProperties) { ByteBuf nettyBuffer = Unpooled.buffer(1500); message.encode(new NettyWritable(nettyBuffer)); byte[] bytes = new byte[nettyBuffer.writerIndex()]; nettyBuffer.readBytes(bytes); - return new AMQPStandardMessage(0, bytes, null); + return new AMQPStandardMessage(0, bytes, extraProperties); } }