From a833d95c1fb7fd5ab5a5220a64df80d06815da47 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 14 Oct 2021 13:25:56 -0400 Subject: [PATCH] ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to core on AMQP Messages on console browsing Done in collaboration with Erwin Dondorp through https://github.com/apache/activemq-artemis/pull/3794/ --- .../activemq/artemis/api/core/JsonUtil.java | 13 +- .../activemq/artemis/api/core/Message.java | 6 + .../core/message/impl/CoreMessage.java | 95 ++++++ .../openmbean/CompositeDataConstants.java | 69 ++++ .../openmbean/MessageOpenTypeFactory.java | 224 +++++++++++++ .../protocol/amqp/broker/AMQPMessage.java | 164 ++++++++++ .../management/impl/QueueControlImpl.java | 11 +- .../openmbean/CompositeDataConstants.java | 63 +--- .../impl/openmbean/OpenTypeSupport.java | 305 ------------------ .../impl/openmbean/OpenTypeSupportTest.java | 3 +- .../management/QueueControlTest.java | 131 +++++++- 11 files changed, 713 insertions(+), 371 deletions(-) create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java create mode 100644 artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index 58cac51541..69dfa8af03 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -325,14 +325,19 @@ public final class JsonUtil { private JsonUtil() { } + public static String truncateString(final String str, final int valueSizeLimit) { + if (str.length() > valueSizeLimit) { + return new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); + } else { + return str; + } + } + public static Object truncate(final Object value, final int valueSizeLimit) { Object result = value; if (valueSizeLimit >= 0) { if (String.class.equals(value.getClass())) { - String str = (String) value; - if (str.length() > valueSizeLimit) { - result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); - } + result = truncateString((String)value, valueSizeLimit); } else if (value.getClass().isArray()) { if (byte[].class.equals(value.getClass())) { if (((byte[]) value).length > valueSizeLimit) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 5525fee84c..bef0fa417b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.api.core; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; @@ -769,6 +771,10 @@ public interface Message { /** This should make you convert your message into Core format. */ ICoreMessage toCore(); + default CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException { + return null; + } + /** This should make you convert your message into Core format. */ ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 1129f558a0..1d5acdae20 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -17,8 +17,15 @@ package org.apache.activemq.artemis.core.message.impl; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.SimpleType; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.zip.DataFormatException; @@ -33,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; @@ -40,6 +48,8 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.message.LargeBodyReader; +import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -1216,6 +1226,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return this; } + @Override public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return this; @@ -1290,4 +1301,88 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return body; } + + + // ******************************************************************************************************************************* + // Composite Data implementation + + private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory(); + private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory(); + + + @Override + public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException { + CompositeType ct; + Map fields; + byte type = getType(); + switch (type) { + case Message.TEXT_TYPE: + ct = TEXT_FACTORY.getCompositeType(); + fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount); + break; + default: + ct = BYTES_FACTORY.getCompositeType(); + fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount); + break; + } + return new CompositeDataSupport(ct, fields); + + } + + static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory { + protected ArrayType body; + + @Override + protected void init() throws OpenDataException { + super.init(); + body = new ArrayType(SimpleType.BYTE, true); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body); + } + + @Override + public Map getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException { + Map rc = super.getFields(m, valueSizeLimit, delivery); + rc.put(CompositeDataConstants.TYPE, m.getType()); + if (!m.isLargeMessage()) { + ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer(); + byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; + bodyCopy.readBytes(bytes); + rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); + } else { + rc.put(CompositeDataConstants.BODY, new byte[0]); + } + return rc; + } + } + + static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { + @Override + protected void init() throws OpenDataException { + super.init(); + addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING); + } + + @Override + public Map getFields(CoreMessage m, int valueSizeLimit, int delivery) throws OpenDataException { + Map rc = super.getFields(m, valueSizeLimit, delivery); + rc.put(CompositeDataConstants.TYPE, m.getType()); + if (!m.isLargeMessage()) { + if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { + rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); + } else { + SimpleString text = m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString(); + rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : ""); + } + } else { + rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); + } + return rc; + } + } + + // Composite Data implementation + // ******************************************************************************************************************************* + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java new file mode 100644 index 0000000000..84fbbfd7d3 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.message.openmbean; + +public interface CompositeDataConstants { + + String ADDRESS = "address"; + String MESSAGE_ID = "messageID"; + String USER_ID = "userID"; + String TYPE = "type"; + String DURABLE = "durable"; + String EXPIRATION = "expiration"; + String PRIORITY = "priority"; + String REDELIVERED = "redelivered"; + String TIMESTAMP = "timestamp"; + String BODY = "BodyPreview"; + String TEXT_BODY = "text"; + String LARGE_MESSAGE = "largeMessage"; + String PERSISTENT_SIZE = "persistentSize"; + String PROPERTIES = "PropertiesText"; + + String ADDRESS_DESCRIPTION = "The Address"; + String MESSAGE_ID_DESCRIPTION = "The message ID"; + String USER_ID_DESCRIPTION = "The user ID"; + String TYPE_DESCRIPTION = "The message type"; + String DURABLE_DESCRIPTION = "Is the message durable"; + String EXPIRATION_DESCRIPTION = "The message expiration"; + String PRIORITY_DESCRIPTION = "The message priority"; + String REDELIVERED_DESCRIPTION = "Has the message been redelivered"; + String TIMESTAMP_DESCRIPTION = "The message timestamp"; + String BODY_DESCRIPTION = "The message body"; + String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large message"; + String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on disk"; + String PROPERTIES_DESCRIPTION = "The properties text"; + + // User properties + String STRING_PROPERTIES = "StringProperties"; + String BOOLEAN_PROPERTIES = "BooleanProperties"; + String BYTE_PROPERTIES = "ByteProperties"; + String SHORT_PROPERTIES = "ShortProperties"; + String INT_PROPERTIES = "IntProperties"; + String LONG_PROPERTIES = "LongProperties"; + String FLOAT_PROPERTIES = "FloatProperties"; + String DOUBLE_PROPERTIES = "DoubleProperties"; + + String STRING_PROPERTIES_DESCRIPTION = "User String Properties"; + String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties"; + String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties"; + String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties"; + String INT_PROPERTIES_DESCRIPTION = "User Int Properties"; + String LONG_PROPERTIES_DESCRIPTION = "User Long Properties"; + String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties"; + String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties"; + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java new file mode 100644 index 0000000000..0028051c91 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.message.openmbean; + +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.JsonUtil; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.jboss.logging.Logger; + +public class MessageOpenTypeFactory { + + private static final Logger logger = Logger.getLogger(MessageOpenTypeFactory.class); + + public MessageOpenTypeFactory() { + try { + init(); + compositeType = createCompositeType(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + } + } + + + private CompositeType compositeType; + private final List itemNamesList = new ArrayList<>(); + private final List itemDescriptionsList = new ArrayList<>(); + private final List itemTypesList = new ArrayList<>(); + + protected TabularType stringPropertyTabularType; + protected TabularType booleanPropertyTabularType; + protected TabularType bytePropertyTabularType; + protected TabularType shortPropertyTabularType; + protected TabularType intPropertyTabularType; + protected TabularType longPropertyTabularType; + protected TabularType floatPropertyTabularType; + protected TabularType doublePropertyTabularType; + protected Object[][] typedPropertyFields; + + protected String getTypeName() { + return Message.class.getName(); + } + + public CompositeType getCompositeType() throws OpenDataException { + return compositeType; + } + + protected void init() throws OpenDataException { + + addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING); + addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG); + addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE); + addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG); + addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN); + addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG); + + addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); + + // now lets expose the type safe properties + stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); + booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); + bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); + shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); + intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); + longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); + floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); + doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); + + addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); + addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); + addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); + addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); + addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); + addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); + addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); + addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); + + typedPropertyFields = new Object[][] { + {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class}, + {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class}, + {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class}, + {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class}, + {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class}, + {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class}, + {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class}, + {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class} + }; + + } + + public Map getFields(M m, int valueSizeLimit, int deliveryCount) throws OpenDataException { + Map rc = new HashMap<>(); + rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); + if (m.getUserID() != null) { + rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); + } else { + rc.put(CompositeDataConstants.USER_ID, ""); + } + rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString()); + rc.put(CompositeDataConstants.DURABLE, m.isDurable()); + rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration()); + rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp()); + rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); + rc.put(CompositeDataConstants.REDELIVERED, deliveryCount > 1); + rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage()); + try { + rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize()); + } catch (final ActiveMQException e1) { + rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); + } + + Map propertyMap = m.toPropertyMap(valueSizeLimit); + + rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); + + // only populate if there are some values + TabularDataSupport tabularData; + for (Object[] typedPropertyInfo : typedPropertyFields) { + tabularData = null; + try { + tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]); + } catch (Exception ignored) { + } + if (tabularData != null && !tabularData.isEmpty()) { + rc.put((String) typedPropertyInfo[0], tabularData); + } else { + rc.put((String) typedPropertyInfo[0], null); + } + } + return rc; + } + + protected String toString(Object value) { + if (value == null) { + return null; + } + return value.toString(); + } + + protected CompositeType createCompositeType() throws OpenDataException { + String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); + String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); + OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); + return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); + } + + protected String getDescription() { + return getTypeName(); + } + + protected TabularType createTabularType(Class type, OpenType openType) throws OpenDataException { + String typeName = "java.util.Map"; + String[] keyValue = new String[]{"key", "value"}; + OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; + CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); + return new TabularType(typeName, typeName, rowType, new String[]{"key"}); + } + + protected TabularDataSupport createTabularData(Map entries, + TabularType type, + Class valueType) throws IOException, OpenDataException { + TabularDataSupport answer = new TabularDataSupport(type); + + for (String key : entries.keySet()) { + Object value = entries.get(key); + if (valueType.isInstance(value)) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value); + answer.put(compositeData); + } else if (valueType == String.class && value instanceof SimpleString) { + CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString()); + answer.put(compositeData); + } + } + return answer; + } + + protected CompositeDataSupport createTabularRowValue(TabularType type, + String key, + Object value) throws OpenDataException { + Map fields = new HashMap<>(); + fields.put("key", key); + fields.put("value", value); + return new CompositeDataSupport(type.getRowType(), fields); + } + + protected void addItem(String name, String description, OpenType type) { + itemNamesList.add(name); + itemDescriptionsList.add(description); + itemTypesList.add(type); + } +} + diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 3926fa5d94..0609a76b43 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.SimpleType; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +40,8 @@ import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; +import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; @@ -72,6 +80,8 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent; + /** * See AMQP v1.0 message format *

@@ -834,9 +844,64 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
          value = JsonUtil.truncate(value, valueSizeLimit);
          map.put(name.toString(), value);
       }
+
+      TypedProperties extraProperties = getExtraProperties();
+      if (extraProperties != null) {
+         extraProperties.forEach((s, o) -> {
+            map.put(s.toString(), JsonUtil.truncate(o.toString(), valueSizeLimit));
+         });
+      }
+      if (!isLargeMessage()) {
+         addAnnotationsAsProperties(map, messageAnnotations);
+      }
+
+      if (properties != null) {
+         if (properties.getContentType() != null) {
+            map.put("properties.getContentType()", properties.getContentType().toString());
+         }
+         if (properties.getContentEncoding() != null) {
+            map.put("properties.getContentEncoding()", properties.getContentEncoding().toString());
+         }
+         if (properties.getGroupId() != null) {
+            map.put("properties.getGroupID()", properties.getGroupId());
+         }
+         if (properties.getGroupSequence() != null) {
+            map.put("properties.getGroupSequence()", properties.getGroupSequence().intValue());
+         }
+         if (properties.getReplyToGroupId() != null) {
+            map.put("properties.getReplyToGroupId()", properties.getReplyToGroupId());
+         }
+      }
+
       return map;
    }
 
+
+   protected static void addAnnotationsAsProperties(Map map, MessageAnnotations annotations) {
+      if (annotations != null && annotations.getValue() != null) {
+         for (Map.Entry entry : annotations.getValue().entrySet()) {
+            String key = entry.getKey().toString();
+            if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+               long deliveryTime = ((Number) entry.getValue()).longValue();
+               map.put("annotation x-opt-delivery-time", deliveryTime);
+            } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+               long delay = ((Number) entry.getValue()).longValue();
+               if (delay > 0) {
+                  map.put("annotation x-opt-delivery-delay", System.currentTimeMillis() + delay);
+               }
+            } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
+               map.put("annotation X_OPT_INGRESS_TIME", ((Number) entry.getValue()).longValue());
+            } else {
+               try {
+                  map.put("annotation " + key, entry.getValue());
+               } catch (ActiveMQPropertyConversionException e) {
+               }
+            }
+         }
+      }
+   }
+
+
    @Override
    public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
       try {
@@ -1726,4 +1791,103 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
    public void setOwner(Object object) {
       this.owner = object;
    }
+
+
+   // *******************************************************************************************************************************
+   // Composite Data implementation
+
+   private static MessageOpenTypeFactory AMQP_FACTORY = new AmqpMessageOpenTypeFactory();
+
+   static class AmqpMessageOpenTypeFactory extends MessageOpenTypeFactory {
+      @Override
+      protected void init() throws OpenDataException {
+         super.init();
+         addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+         addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+      }
+
+      @Override
+      public Map getFields(AMQPMessage m, int valueSizeLimit, int delivery) throws OpenDataException {
+         Map rc = super.getFields(m, valueSizeLimit, delivery);
+
+         if (!m.isLargeMessage()) {
+            m.ensureScanning();
+         }
+
+         Properties properties = m.getCurrentProperties();
+
+         byte type = getType(m, properties);
+
+         rc.put(CompositeDataConstants.TYPE, type);
+
+         if (m.isLargeMessage())  {
+            rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+         } else {
+            if (m.getBody() instanceof AmqpValue) {
+               Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+            } else {
+               rc.put(CompositeDataConstants.TEXT_BODY, JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+            }
+         }
+
+         return rc;
+      }
+
+      private byte getType(AMQPMessage m, Properties properties) {
+         if (m.isLargeMessage()) {
+            return DEFAULT_TYPE;
+         }
+         byte type = BYTES_TYPE;
+
+         final Symbol contentType = properties != null ? properties.getContentType() : null;
+         final String contentTypeString = contentType != null ? contentType.toString() : null;
+
+         if (m.getBody() instanceof Data) {
+
+            if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+               type = OBJECT_TYPE;
+            } else if (contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+               type = BYTES_TYPE;
+            } else {
+               Charset charset = getCharsetForTextualContent(contentTypeString);
+               if (StandardCharsets.UTF_8.equals(charset)) {
+                  type = TEXT_TYPE;
+               }
+            }
+         } else if (m.getBody() instanceof AmqpSequence) {
+            type = STREAM_TYPE;
+         } else if (m.getBody() instanceof AmqpValue) {
+            Object value = ((AmqpValue) m.getBody()).getValue();
+
+            if (value instanceof String) {
+               type = TEXT_TYPE;
+            } else if (value instanceof Binary) {
+
+               if (contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+                  type = OBJECT_TYPE;
+               } else {
+                  type = BYTES_TYPE;
+               }
+            } else if (value instanceof List) {
+               type = STREAM_TYPE;
+            } else if (value instanceof Map) {
+               type = MAP_TYPE;
+            }
+         }
+         return type;
+      }
+   }
+
+   @Override
+   public CompositeData toCompositeData(int fieldsLimit, int deliveryCount) throws OpenDataException {
+      Map fields;
+      fields = AMQP_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+      return new CompositeDataSupport(AMQP_FACTORY.getCompositeType(), fields);
+   }
+
+   // Composite Data implementation
+   // *******************************************************************************************************************************
+
 }
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5643a787ff..fbb4d78b3d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -61,9 +60,12 @@ import org.apache.activemq.artemis.selector.filter.Filterable;
 import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.jboss.logging.Logger;
 
 public class QueueControlImpl extends AbstractControl implements QueueControl {
 
+   private static final Logger logger = Logger.getLogger(QueueControlImpl.class);
+
    public static final int FLUSH_LIMIT = 500;
 
    // Constants -----------------------------------------------------
@@ -1583,7 +1585,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
                      if (index >= start) {
-                        c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
+                        c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
                      }
                      //we only increase the index if we add a message, otherwise we could stop before we get to a filtered message
                      index++;
@@ -1600,7 +1602,8 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
             }
             return rc;
          }
-      } catch (ActiveMQException e) {
+      } catch (Exception e) {
+         logger.warn(e.getMessage(), e);
          if (AuditLogger.isResourceLoggingEnabled()) {
             AuditLogger.browseMessagesFailure(queue.getName().toString());
          }
@@ -1635,7 +1638,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
                while (iterator.hasNext() && currentPageSize++ < limit) {
                   MessageReference ref = iterator.next();
                   if (thefilter == null || thefilter.match(ref.getMessage())) {
-                     c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
+                     c.add(ref.getMessage().toCompositeData(attributeSizeLimit, ref.getDeliveryCount()));
 
                   }
                }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
index a8cde039bd..2970c0c9a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
@@ -1,12 +1,12 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,56 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.activemq.artemis.core.management.impl.openmbean;
 
-public interface CompositeDataConstants {
-
-   String ADDRESS = "address";
-   String MESSAGE_ID = "messageID";
-   String USER_ID = "userID";
-   String TYPE = "type";
-   String DURABLE = "durable";
-   String EXPIRATION = "expiration";
-   String PRIORITY = "priority";
-   String REDELIVERED = "redelivered";
-   String TIMESTAMP = "timestamp";
-   String BODY = "BodyPreview";
-   String TEXT_BODY = "text";
-   String LARGE_MESSAGE = "largeMessage";
-   String PERSISTENT_SIZE = "persistentSize";
-   String PROPERTIES = "PropertiesText";
-
-   String ADDRESS_DESCRIPTION = "The Address";
-   String MESSAGE_ID_DESCRIPTION = "The message ID";
-   String USER_ID_DESCRIPTION = "The user ID";
-   String TYPE_DESCRIPTION = "The message type";
-   String DURABLE_DESCRIPTION = "Is the message durable";
-   String EXPIRATION_DESCRIPTION = "The message expiration";
-   String PRIORITY_DESCRIPTION = "The message priority";
-   String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
-   String TIMESTAMP_DESCRIPTION = "The message timestamp";
-   String BODY_DESCRIPTION = "The message body";
-   String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large message";
-   String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on disk";
-   String PROPERTIES_DESCRIPTION = "The properties text";
-
-   // User properties
-   String STRING_PROPERTIES = "StringProperties";
-   String BOOLEAN_PROPERTIES = "BooleanProperties";
-   String BYTE_PROPERTIES = "ByteProperties";
-   String SHORT_PROPERTIES = "ShortProperties";
-   String INT_PROPERTIES = "IntProperties";
-   String LONG_PROPERTIES = "LongProperties";
-   String FLOAT_PROPERTIES = "FloatProperties";
-   String DOUBLE_PROPERTIES = "DoubleProperties";
-
-   String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
-   String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
-   String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
-   String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
-   String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
-   String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
-   String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
-   String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
+/**
+ * @deprecated use org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants
+ */
+@Deprecated
+public interface CompositeDataConstants extends org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants {
 
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
deleted file mode 100644
index 21e9437b1a..0000000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.management.impl.openmbean; - -import javax.management.openmbean.ArrayType; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.api.core.JsonUtil; -import org.apache.activemq.artemis.api.core.Message; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.server.MessageReference; - -public final class OpenTypeSupport { - - private static MessageOpenTypeFactory TEXT_FACTORY = new TextMessageOpenTypeFactory(); - private static MessageOpenTypeFactory BYTES_FACTORY = new BytesMessageOpenTypeFactory(); - - private OpenTypeSupport() { - } - - public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException { - CompositeType ct; - - ICoreMessage message = ref.getMessage().toCore(); - - Map fields; - byte type = message.getType(); - - switch(type) { - case Message.TEXT_TYPE: - ct = TEXT_FACTORY.getCompositeType(); - fields = TEXT_FACTORY.getFields(ref, valueSizeLimit); - break; - default: - ct = BYTES_FACTORY.getCompositeType(); - fields = BYTES_FACTORY.getFields(ref, valueSizeLimit); - break; - } - return new CompositeDataSupport(ct, fields); - } - - static class MessageOpenTypeFactory { - - private CompositeType compositeType; - private final List itemNamesList = new ArrayList<>(); - private final List itemDescriptionsList = new ArrayList<>(); - private final List itemTypesList = new ArrayList<>(); - - protected TabularType stringPropertyTabularType; - protected TabularType booleanPropertyTabularType; - protected TabularType bytePropertyTabularType; - protected TabularType shortPropertyTabularType; - protected TabularType intPropertyTabularType; - protected TabularType longPropertyTabularType; - protected TabularType floatPropertyTabularType; - protected TabularType doublePropertyTabularType; - protected Object[][] typedPropertyFields; - - protected String getTypeName() { - return Message.class.getName(); - } - - public CompositeType getCompositeType() throws OpenDataException { - if (compositeType == null) { - init(); - compositeType = createCompositeType(); - } - return compositeType; - } - - protected void init() throws OpenDataException { - - addItem(CompositeDataConstants.ADDRESS, CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.MESSAGE_ID, CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.USER_ID, CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING); - addItem(CompositeDataConstants.TYPE, CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE); - addItem(CompositeDataConstants.DURABLE, CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.EXPIRATION, CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG); - addItem(CompositeDataConstants.PRIORITY, CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE); - addItem(CompositeDataConstants.REDELIVERED, CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.TIMESTAMP, CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG); - addItem(CompositeDataConstants.LARGE_MESSAGE, CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN); - addItem(CompositeDataConstants.PERSISTENT_SIZE, CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG); - - addItem(CompositeDataConstants.PROPERTIES, CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING); - - // now lets expose the type safe properties - stringPropertyTabularType = createTabularType(String.class, SimpleType.STRING); - booleanPropertyTabularType = createTabularType(Boolean.class, SimpleType.BOOLEAN); - bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE); - shortPropertyTabularType = createTabularType(Short.class, SimpleType.SHORT); - intPropertyTabularType = createTabularType(Integer.class, SimpleType.INTEGER); - longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG); - floatPropertyTabularType = createTabularType(Float.class, SimpleType.FLOAT); - doublePropertyTabularType = createTabularType(Double.class, SimpleType.DOUBLE); - - addItem(CompositeDataConstants.STRING_PROPERTIES, CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION, stringPropertyTabularType); - addItem(CompositeDataConstants.BOOLEAN_PROPERTIES, CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION, booleanPropertyTabularType); - addItem(CompositeDataConstants.BYTE_PROPERTIES, CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType); - addItem(CompositeDataConstants.SHORT_PROPERTIES, CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType); - addItem(CompositeDataConstants.INT_PROPERTIES, CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType); - addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); - addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); - addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); - - typedPropertyFields = new Object[][] { - {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class}, - {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class}, - {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class}, - {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class}, - {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class}, - {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class}, - {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class}, - {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class} - }; - - } - - public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map rc = new HashMap<>(); - ICoreMessage m = ref.getMessage().toCore(); - rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); - if (m.getUserID() != null) { - rc.put(CompositeDataConstants.USER_ID, "ID:" + m.getUserID().toString()); - } else { - rc.put(CompositeDataConstants.USER_ID, ""); - } - rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" : m.getAddress().toString()); - rc.put(CompositeDataConstants.TYPE, m.getType()); - rc.put(CompositeDataConstants.DURABLE, m.isDurable()); - rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration()); - rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp()); - rc.put(CompositeDataConstants.PRIORITY, m.getPriority()); - rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() > 1); - rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage()); - try { - rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize()); - } catch (final ActiveMQException e1) { - rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); - } - - Map propertyMap = m.toPropertyMap(valueSizeLimit); - - rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); - - // only populate if there are some values - TabularDataSupport tabularData; - for (Object[] typedPropertyInfo : typedPropertyFields) { - tabularData = null; - try { - tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]); - } catch (Exception ignored) { - } - if (tabularData != null && !tabularData.isEmpty()) { - rc.put((String) typedPropertyInfo[0], tabularData); - } else { - rc.put((String) typedPropertyInfo[0], null); - } - } - return rc; - } - - protected String toString(Object value) { - if (value == null) { - return null; - } - return value.toString(); - } - - protected CompositeType createCompositeType() throws OpenDataException { - String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]); - String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]); - OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]); - return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes); - } - - protected String getDescription() { - return getTypeName(); - } - - protected TabularType createTabularType(Class type, OpenType openType) throws OpenDataException { - String typeName = "java.util.Map"; - String[] keyValue = new String[]{"key", "value"}; - OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType}; - CompositeType rowType = new CompositeType(typeName, typeName, keyValue, keyValue, openTypes); - return new TabularType(typeName, typeName, rowType, new String[]{"key"}); - } - - protected TabularDataSupport createTabularData(Map entries, - TabularType type, - Class valueType) throws IOException, OpenDataException { - TabularDataSupport answer = new TabularDataSupport(type); - - for (String key : entries.keySet()) { - Object value = entries.get(key); - if (valueType.isInstance(value)) { - CompositeDataSupport compositeData = createTabularRowValue(type, key, value); - answer.put(compositeData); - } else if (valueType == String.class && value instanceof SimpleString) { - CompositeDataSupport compositeData = createTabularRowValue(type, key, value.toString()); - answer.put(compositeData); - } - } - return answer; - } - - protected CompositeDataSupport createTabularRowValue(TabularType type, - String key, - Object value) throws OpenDataException { - Map fields = new HashMap<>(); - fields.put("key", key); - fields.put("value", value); - return new CompositeDataSupport(type.getRowType(), fields); - } - - protected void addItem(String name, String description, OpenType type) { - itemNamesList.add(name); - itemDescriptionsList.add(description); - itemTypesList.add(type); - } - } - - - static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory { - protected ArrayType body; - - @Override - protected void init() throws OpenDataException { - super.init(); - body = new ArrayType(SimpleType.BYTE, true); - addItem(CompositeDataConstants.BODY, CompositeDataConstants.BODY_DESCRIPTION, body); - } - - @Override - public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map rc = super.getFields(ref, valueSizeLimit); - ICoreMessage m = ref.getMessage().toCore(); - if (!m.isLargeMessage()) { - ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); - byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; - bodyCopy.readBytes(bytes); - rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); - } else { - rc.put(CompositeDataConstants.BODY, new byte[0]); - } - return rc; - } - } - - static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory { - protected SimpleType text; - - @Override - protected void init() throws OpenDataException { - super.init(); - addItem(CompositeDataConstants.TEXT_BODY, CompositeDataConstants.TEXT_BODY, SimpleType.STRING); - } - - @Override - public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { - Map rc = super.getFields(ref, valueSizeLimit); - ICoreMessage m = ref.getMessage().toCore(); - if (!m.isLargeMessage()) { - if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { - rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); - } else { - SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); - rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text.toString(), valueSizeLimit) : ""); - } - } else { - rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); - } - return rc; - } - } -} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java index 1839fe534b..877237d7bb 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java @@ -20,7 +20,6 @@ package org.apache.activemq.artemis.core.management.impl.openmbean; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; -import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl; import org.apache.activemq.artemis.reader.TextMessageUtil; import org.junit.Assert; import org.junit.Test; @@ -39,7 +38,7 @@ public class OpenTypeSupportTest { TextMessageUtil.writeBodyText(coreMessage.getBodyBuffer(), SimpleString.toSimpleString(bodyText)); - CompositeData cd = OpenTypeSupport.convert(new MessageReferenceImpl(coreMessage, null), 256); + CompositeData cd = coreMessage.toCompositeData(256, 1); Assert.assertEquals(bodyText, cd.get("text")); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index d45bcffb06..66dbcffe2f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.management; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; import javax.json.JsonArray; import javax.json.JsonObject; import javax.management.Notification; @@ -70,10 +76,12 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.RandomUtil; @@ -86,8 +94,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; -import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES; +import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.BODY; +import static org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES; @RunWith(value = Parameterized.class) public class QueueControlTest extends ManagementTestBase { @@ -3447,6 +3455,123 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertEquals(new String(body), "theBody"); } + + @Test + public void testSendMessageWithAMQP() throws Exception { + SimpleString address = new SimpleString("address_testSendMessageWithAMQP"); + SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP"); + + server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage("theAMQPBody"); + message.setStringProperty("protocolUsed", "amqp"); + producer.send(message); + } + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage("theCoreBody"); + message.setStringProperty("protocolUsed", "core"); + producer.send(message); + } + } + + Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(2, browse.length); + + String body = (String) browse[0].get("text"); + + Assert.assertNotNull(body); + + Assert.assertEquals("theAMQPBody", body); + + body = (String) browse[1].get("text"); + + Assert.assertNotNull(body); + + Assert.assertEquals("theCoreBody", body); + + } + + + @Test + public void testSendMessageWithAMQPLarge() throws Exception { + SimpleString address = new SimpleString("address_testSendMessageWithAMQP"); + SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP"); + + server.addAddressInfo(new AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + Wait.assertTrue(() -> server.locateQueue(queue) != null && server.getAddressInfo(address) != null); + + QueueControl queueControl = createManagementControl(address, queue, RoutingType.ANYCAST); + + StringBuffer bufferLarge = new StringBuffer(); + for (int i = 0; i < 100 * 1024; i++) { + bufferLarge.append("*-"); + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage(bufferLarge.toString()); + message.setStringProperty("protocolUsed", "amqp"); + producer.send(message); + } + } + + { // a namespace + ConnectionFactory factory = CFUtil.createConnectionFactory("core", "tcp://localhost:61616"); + try (Connection connection = factory.createConnection("myUser", "myPassword")) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(address.toString())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = session.createTextMessage(bufferLarge.toString()); + message.setStringProperty("protocolUsed", "core"); + producer.send(message); + } + } + + Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100); + + // the message IDs are set on the server + CompositeData[] browse = queueControl.browse(null); + + Assert.assertEquals(2, browse.length); + + String body = (String) browse[0].get("text"); + + Assert.assertNotNull(body); + + body = (String) browse[1].get("text"); + + Assert.assertNotNull(body); + + } + @Test public void testSendMessageWithMessageId() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -3763,7 +3888,7 @@ public class QueueControlTest extends ManagementTestBase { @Before public void setUp() throws Exception { super.setUp(); - Configuration conf = createDefaultInVMConfig().setJMXManagementEnabled(true); + Configuration conf = createDefaultConfig(true).setJMXManagementEnabled(true); server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer, true)); server.start();