diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java index dee38d3554..c66d33214c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java @@ -141,7 +141,7 @@ public class MessageOpenTypeFactory { rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); } - Map propertyMap = m.toPropertyMap(valueSizeLimit); + Map propertyMap = expandProperties(m, valueSizeLimit); rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); @@ -162,6 +162,10 @@ public class MessageOpenTypeFactory { return rc; } + protected Map expandProperties(M m, int valueSizeLimit) { + return m.toPropertyMap(valueSizeLimit); + } + protected String toString(Object value) { if (value == null) { return null; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 4f0c4d6005..36b03ea3e1 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -602,8 +602,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } @Override - public long getPersistentSize() throws ActiveMQException { - return 0; + public long getPersistentSize() { + try { + return largeBody.getBodySize(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + return 0; + } } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 91a3adb890..61682b10f8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -834,6 +834,25 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public Map toPropertyMap(int valueSizeLimit) { + return toPropertyMap(false, valueSizeLimit); + } + + private Map toPropertyMap(boolean expandPropertyType, int valueSizeLimit) { + String extraPropertiesPrefix; + String applicationPropertiesPrefix; + String annotationPrefix; + String propertiesPrefix; + if (expandPropertyType) { + extraPropertiesPrefix = "extraProperties."; + applicationPropertiesPrefix = "applicationProperties."; + annotationPrefix = "messageAnnotations."; + propertiesPrefix = "properties."; + } else { + extraPropertiesPrefix = ""; + applicationPropertiesPrefix = ""; + annotationPrefix = ""; + propertiesPrefix = ""; + } Map map = new HashMap<>(); for (SimpleString name : getPropertyNames()) { Object value = getObjectProperty(name.toString()); @@ -842,45 +861,45 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. value = ((Binary)value).getArray(); } value = JsonUtil.truncate(value, valueSizeLimit); - map.put("applicationProperties." + name, value); + map.put(applicationPropertiesPrefix + name, value); } TypedProperties extraProperties = getExtraProperties(); if (extraProperties != null) { extraProperties.forEach((s, o) -> { - map.put("extraProperties." + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit)); + map.put(extraPropertiesPrefix + s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit)); }); } - addAnnotationsAsProperties(map, messageAnnotations); + addAnnotationsAsProperties(annotationPrefix, map, messageAnnotations); if (properties != null) { if (properties.getContentType() != null) { - map.put("properties.contentType", properties.getContentType().toString()); + map.put(propertiesPrefix + "contentType", properties.getContentType().toString()); } if (properties.getContentEncoding() != null) { - map.put("properties.contentEncoding", properties.getContentEncoding().toString()); + map.put(propertiesPrefix + "contentEncoding", properties.getContentEncoding().toString()); } if (properties.getGroupId() != null) { - map.put("properties.groupID", properties.getGroupId()); + map.put(propertiesPrefix + "groupID", properties.getGroupId()); } if (properties.getGroupSequence() != null) { - map.put("properties.groupSequence", properties.getGroupSequence().intValue()); + map.put(propertiesPrefix + "groupSequence", properties.getGroupSequence().intValue()); } if (properties.getReplyToGroupId() != null) { - map.put("properties.replyToGroupId", properties.getReplyToGroupId()); + map.put(propertiesPrefix + "replyToGroupId", properties.getReplyToGroupId()); } if (properties.getCreationTime() != null) { - map.put("properties.creationTime", properties.getCreationTime().getTime()); + map.put(propertiesPrefix + "creationTime", properties.getCreationTime().getTime()); } if (properties.getAbsoluteExpiryTime() != null) { - map.put("properties.absoluteExpiryTime", properties.getCreationTime().getTime()); + map.put(propertiesPrefix + "absoluteExpiryTime", properties.getCreationTime().getTime()); } if (properties.getTo() != null) { - map.put("properties.to", properties.getTo()); + map.put(propertiesPrefix + "to", properties.getTo()); } if (properties.getSubject() != null) { - map.put("properties.subject", properties.getSubject()); + map.put(propertiesPrefix + "subject", properties.getSubject()); } } @@ -888,21 +907,21 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. } - protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) { + protected static void addAnnotationsAsProperties(String prefix, Map map, MessageAnnotations annotations) { if (annotations != null && annotations.getValue() != null) { for (Map.Entry entry : annotations.getValue().entrySet()) { String key = entry.getKey().toString(); if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); - map.put("message-annotation.x-opt-delivery-time", deliveryTime); + map.put(prefix + "x-opt-delivery-time", deliveryTime); } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { long delay = ((Number) entry.getValue()).longValue(); - map.put("message-annotation.x-opt-delivery-delay", delay); + map.put("x-opt-delivery-delay", delay); } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) { - map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); + map.put("X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue()); } else { try { - map.put("message-annotation." + key, entry.getValue()); + map.put(prefix + key, entry.getValue()); } catch (ActiveMQPropertyConversionException e) { logger.warn(e.getMessage(), e); } @@ -1843,6 +1862,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return rc; } + @Override + protected Map expandProperties(AMQPMessage m, int valueSizeLimit) { + return m.toPropertyMap(true, valueSizeLimit); + } + private byte getType(AMQPMessage m, Properties properties) { if (m.isLargeMessage()) { return DEFAULT_TYPE; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java index 848e1908ec..f32153a4c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java @@ -80,12 +80,22 @@ public class JMXManagementTest extends JMSClientTestSupport { } //before commit - assertEquals(num, queueControl.getDeliveringCount()); + Wait.assertEquals(num, () -> queueControl.getDeliveringCount()); - Map[]> result = queueControl.listDeliveringMessages(); - assertEquals(1, result.size()); + Map[]> result = null; + Map[] msgMaps = null; + // we might need some retry, and Wait.assert won't be as efficient on this case + for (int i = 0; i < 10; i++) { + result = queueControl.listDeliveringMessages(); + assertEquals(1, result.size()); - Map[] msgMaps = result.entrySet().iterator().next().getValue(); + msgMaps = result.entrySet().iterator().next().getValue(); + if (msgMaps.length == num) { + break; + } else { + Thread.sleep(100); + } + } assertEquals(num, msgMaps.length); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java index 2af5653104..dc5cf9a991 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java @@ -359,7 +359,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { assertEquals(1, browseResult.length); if ((boolean) browseResult[0].get("largeMessage")) { - assertTrue(browseResult[0].containsKey("text")); + // The AMQPMessage will part the body as text (...Large Message...) while core will parse it differently + assertTrue(browseResult[0].containsKey("text") || browseResult[0].containsKey("BodyPreview")); } connection = client.createConnection();