diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 981f55bf2f..3d4438ecc0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -1344,7 +1344,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { Map rc = super.getFields(m, valueSizeLimit, delivery); rc.put(CompositeDataConstants.TYPE, m.getType()); if (!m.isLargeMessage()) { - ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer(); + ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; bodyCopy.readBytes(bytes); rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); @@ -1370,7 +1370,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); } else { - SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString(); + SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : ""); } } else { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 54aa270d3c..91a3adb890 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -842,34 +842,45 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. value = ((Binary)value).getArray(); } value = JsonUtil.truncate(value, valueSizeLimit); - map.put(name.toString(), value); + map.put("applicationProperties." + name, value); } TypedProperties extraProperties = getExtraProperties(); if (extraProperties != null) { extraProperties.forEach((s, o) -> { - map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit)); + map.put("extraProperties." + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit)); }); } - if (!isLargeMessage()) { - addAnnotationsAsProperties(map, messageAnnotations); - } + + addAnnotationsAsProperties(map, messageAnnotations); if (properties != null) { if (properties.getContentType() != null) { - map.put("properties.getContentType()", properties.getContentType().toString()); + map.put("properties.contentType", properties.getContentType().toString()); } if (properties.getContentEncoding() != null) { - map.put("properties.getContentEncoding()", properties.getContentEncoding().toString()); + map.put("properties.contentEncoding", properties.getContentEncoding().toString()); } if (properties.getGroupId() != null) { - map.put("properties.getGroupID()", properties.getGroupId()); + map.put("properties.groupID", properties.getGroupId()); } if (properties.getGroupSequence() != null) { - map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue()); + map.put("properties.groupSequence", properties.getGroupSequence().intValue()); } if (properties.getReplyToGroupId() != null) { - map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId()); + map.put("properties.replyToGroupId", properties.getReplyToGroupId()); + } + if (properties.getCreationTime() != null) { + map.put("properties.creationTime", properties.getCreationTime().getTime()); + } + if (properties.getAbsoluteExpiryTime() != null) { + map.put("properties.absoluteExpiryTime", properties.getCreationTime().getTime()); + } + if (properties.getTo() != null) { + map.put("properties.to", properties.getTo()); + } + if (properties.getSubject() != null) { + map.put("properties.subject", properties.getSubject()); } } @@ -883,18 +894,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. String key = entry.getKey().toString(); if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); - map.put("annotation x-opt-delivery-time", deliveryTime); + map.put("message-annotation.x-opt-delivery-time", deliveryTime); } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { long delay = ((Number) entry.getValue()).longValue(); - if (delay > 0) { - map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay); - } + map.put("message-annotation.x-opt-delivery-delay", delay); } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { - map.put("annotation X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); + map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); } else { try { - map.put("annotation " + key, entry.getValue()); + map.put("message-annotation." + key, entry.getValue()); } catch (ActiveMQPropertyConversionException e) { + logger.warn(e.getMessage(), e); } } } @@ -1807,12 +1817,12 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public Map getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException { - Map rc = super.getFields(m, valueSizeLimit, delivery); - if (!m.isLargeMessage()) { m.ensureScanning(); } + Map rc = super.getFields(m, valueSizeLimit, delivery); + Properties properties = m.getCurrentProperties(); byte type = getType(m, properties); @@ -1822,12 +1832,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. if (m.isLargeMessage()) { rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ..."); } else { - if (m.getBody() instanceof AmqpValue) { - Object amqpValue = ((AmqpValue) m.getBody()).getValue(); - - rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit)); + Object amqpValue; + if (m.getBody() instanceof AmqpValue && (amqpValue = ((AmqpValue) m.getBody()).getValue()) != null) { + rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(String.valueOf(amqpValue), valueSizeLimit)); } else { - rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit)); + rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(String.valueOf(m.getBody()), valueSizeLimit)); } } @@ -1840,10 +1849,14 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } byte type = BYTES_TYPE; + if (m.getBody() == null) { + return DEFAULT_TYPE; + } + final Symbol contentType = properties != null ? properties.getContentType() : null; final String contentTypeString = contentType != null ? contentType.toString() : null; - if (m.getBody() instanceof Data) { + if (m.getBody() instanceof Data && contentType != null) { if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) { type = OBJECT_TYPE; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java index 3ab129be7e..2af5653104 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java @@ -359,7 +359,7 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { assertEquals(1, browseResult.length); if ((boolean) browseResult[0].get("largeMessage")) { - assertTrue(browseResult[0].containsKey("BodyPreview")); + assertTrue(browseResult[0].containsKey("text")); } connection = client.createConnection();