diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java index d367985037..71d4f5f2f0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -102,10 +102,11 @@ public interface ICoreMessage extends Message { /** * @return Returns the message in Map form, useful when encoding to JSON + * @param valueSizeLimit */ @Override - default Map toMap() { - Map map = toPropertyMap(); + default Map toMap(int valueSizeLimit) { + Map map = toPropertyMap(valueSizeLimit); map.put("messageID", getMessageID()); Object userID = getUserID(); if (getUserID() != null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java index e017be2951..0576cc9fe5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java @@ -28,6 +28,7 @@ import javax.management.openmbean.CompositeDataSupport; import java.io.ByteArrayInputStream; import java.io.StringReader; import java.lang.reflect.Array; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,52 +46,7 @@ public final class JsonUtil { JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder(); for (Object parameter : array) { - if (parameter instanceof Map) { - Map map = (Map) parameter; - - JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder(); - - for (Map.Entry entry : map.entrySet()) { - String key = entry.getKey(); - - Object val = entry.getValue(); - - if (val != null) { - if (val.getClass().isArray()) { - JsonArray objectArray = toJSONArray((Object[]) val); - jsonObject.add(key, objectArray); - } else { - addToObject(key, val, jsonObject); - } - } - } - jsonArray.add(jsonObject); - } else { - if (parameter != null) { - Class clz = parameter.getClass(); - - if (clz.isArray()) { - Object[] innerArray = (Object[]) parameter; - - if (innerArray instanceof CompositeData[]) { - JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder(); - for (Object data : innerArray) { - String s = Base64.encodeObject((CompositeDataSupport) data); - innerJsonArray.add(s); - } - JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder(); - jsonObject.add(CompositeData.class.getName(), innerJsonArray); - jsonArray.add(jsonObject); - } else { - jsonArray.add(toJSONArray(innerArray)); - } - } else { - addToArray(parameter, jsonArray); - } - } else { - jsonArray.addNull(); - } - } + addToArray(parameter, jsonArray); } return jsonArray.build(); } @@ -210,6 +166,12 @@ public final class JsonUtil { } else if (param instanceof byte[]) { JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param); jsonObjectBuilder.add(key, byteArrayObject); + } else if (param instanceof Object[]) { + final JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder(); + for (Object parameter : (Object[])param) { + addToArray(parameter, objectArrayBuilder); + } + jsonObjectBuilder.add(key, objectArrayBuilder); } else { throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName()); } @@ -238,6 +200,21 @@ public final class JsonUtil { } else if (param instanceof byte[]) { JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param); jsonArrayBuilder.add(byteArrayObject); + } else if (param instanceof CompositeData[]) { + JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder(); + for (Object data : (CompositeData[])param) { + String s = Base64.encodeObject((CompositeDataSupport) data); + innerJsonArray.add(s); + } + JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder(); + jsonObject.add(CompositeData.class.getName(), innerJsonArray); + jsonArrayBuilder.add(jsonObject); + } else if (param instanceof Object[]) { + JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder(); + for (Object parameter : (Object[])param) { + addToArray(parameter, objectArrayBuilder); + } + jsonArrayBuilder.add(objectArrayBuilder); } else { throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName()); } @@ -345,6 +322,29 @@ public final class JsonUtil { private JsonUtil() { } + public static Object truncate(final Object value, final int valueSizeLimit) { + Object result = value; + if (valueSizeLimit >= 0) { + if (String.class.equals(value.getClass())) { + String str = (String) value; + if (str.length() > valueSizeLimit) { + result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString(); + } + } else if (value.getClass().isArray()) { + if (byte[].class.equals(value.getClass())) { + if (((byte[]) value).length > valueSizeLimit) { + result = Arrays.copyOfRange((byte[]) value, 0, valueSizeLimit); + } + } else if (char[].class.equals(value.getClass())) { + if (((char[]) value).length > valueSizeLimit) { + result = Arrays.copyOfRange((char[]) value, 0, valueSizeLimit); + } + } + } + } + return result; + } + private static class NullableJsonString implements JsonValue, JsonString { private final String value; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 249374798b..b5213e9775 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -708,7 +708,15 @@ public interface Message { * @return Returns the message in Map form, useful when encoding to JSON */ default Map toMap() { - Map map = toPropertyMap(); + return toMap(-1); + } + + /** + * @return Returns the message in Map form, useful when encoding to JSON + * @param valueSizeLimit that limits [] map values + */ + default Map toMap(int valueSizeLimit) { + Map map = toPropertyMap(valueSizeLimit); map.put("messageID", getMessageID()); Object userID = getUserID(); if (getUserID() != null) { @@ -728,6 +736,14 @@ public interface Message { * @return Returns the message properties in Map form, useful when encoding to JSON */ default Map toPropertyMap() { + return toPropertyMap(-1); + } + + /** + * @return Returns the message properties in Map form, useful when encoding to JSON + * @param valueSizeLimit that limits [] map values + */ + default Map toPropertyMap(int valueSizeLimit) { Map map = new HashMap<>(); for (SimpleString name : getPropertyNames()) { Object value = getObjectProperty(name.toString()); @@ -735,12 +751,12 @@ public interface Message { if (value instanceof SimpleString) { value = value.toString(); } + value = JsonUtil.truncate(value, valueSizeLimit); map.put(name.toString(), value); } return map; } - /** This should make you convert your message into Core format. */ ICoreMessage toCore(); diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java index 0735e1ea0a..c52daa411b 100644 --- a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMessageTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.message; +import java.nio.charset.StandardCharsets; import java.util.LinkedList; import io.netty.buffer.ByteBuf; @@ -211,6 +212,32 @@ public class CoreMessageTest { } } + @Test + public void testToMapLimit() throws Exception { + + CoreMessage coreMessage = new CoreMessage().initBuffer(BIGGER_TEXT.length() * 2); + coreMessage.putStringProperty("prop", BIGGER_TEXT); + coreMessage.putBytesProperty("bytesProp", BIGGER_TEXT.getBytes(StandardCharsets.UTF_8)); + + Assert.assertEquals(BIGGER_TEXT.length(), ((String)coreMessage.toMap().get("prop")).length()); + Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap().get("bytesProp")).length); + + // limit the values + Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap(40).get("bytesProp")).length); + String mapVal = ((String)coreMessage.toMap(40).get("prop")); + Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length()); + Assert.assertTrue(mapVal.contains("more")); + + mapVal = ((String)coreMessage.toMap(0).get("prop")); + Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length()); + Assert.assertTrue(mapVal.contains("more")); + + Assert.assertEquals(BIGGER_TEXT.length(), Integer.parseInt(mapVal.substring(mapVal.lastIndexOf('+') + 1, mapVal.lastIndexOf('m')).trim())); + + Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap().get("bytesProp")).length); + Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap(40).get("bytesProp")).length); + } + @Test public void testSaveReceiveLimitedBytes() { CoreMessage empty = new CoreMessage().initBuffer(100); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 25e983c56a..f32c285b35 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -835,7 +836,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. public abstract int getMemoryEstimate(); @Override - public Map toPropertyMap() { + public Map toPropertyMap(int valueSizeLimit) { Map map = new HashMap<>(); for (SimpleString name : getPropertyNames()) { Object value = getObjectProperty(name.toString()); @@ -843,6 +844,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. if (value instanceof Binary) { value = ((Binary)value).getArray(); } + value = JsonUtil.truncate(value, valueSizeLimit); map.put(name.toString(), value); } return map; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 6994860c47..f6e3019a82 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -281,6 +281,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size"; + private static final String MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = "management-message-attribute-size-limit"; + private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections"; private static final String MAX_QUEUES_NODE_NAME = "max-queues"; @@ -1250,6 +1252,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { addressSettings.setConfigDeleteAddresses(policy); } else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) { addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child)); + } else if (MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT.equalsIgnoreCase(name)) { + addressSettings.setManagementMessageAttributeSizeLimit(XMLUtil.parseInt(child)); } else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) { addressSettings.setDefaultPurgeOnNoConsumers(XMLUtil.parseBoolean(child)); } else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java index 4e562129bf..67f75496a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java @@ -788,11 +788,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { * @return */ private Map[] convertMessagesToMaps(List refs) throws ActiveMQException { + final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); Map[] messages = new Map[refs.size()]; int i = 0; for (MessageReference ref : refs) { Message message = ref.getMessage(); - messages[i++] = message.toMap(); + messages[i++] = message.toMap(attributeSizeLimit); } return messages; } @@ -847,7 +848,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { Filter filter = FilterImpl.createFilter(filterStr); List> messages = new ArrayList<>(); queue.flushExecutor(); - final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize(); + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address); + final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit(); + final int limit = addressSettings.getManagementBrowsePageSize(); int count = 0; try (LinkedListIterator iterator = queue.browserIterator()) { try { @@ -855,7 +858,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { MessageReference ref = iterator.next(); if (filter == null || filter.match(ref.getMessage())) { Message message = ref.getMessage(); - messages.add(message.toMap()); + messages.add(message.toMap(attributeSizeLimit)); } } } catch (NoSuchElementException ignored) { @@ -895,12 +898,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { try { List> messages = new ArrayList<>(); queue.flushExecutor(); + final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); try (LinkedListIterator iterator = queue.browserIterator()) { // returns just the first, as it's the first only if (iterator.hasNext()) { MessageReference ref = iterator.next(); Message message = ref.getMessage(); - messages.add(message.toMap()); + messages.add(message.toMap(attributeSizeLimit)); } return messages.toArray(new Map[1]); } @@ -985,7 +989,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { if (filter == null && groupByProperty == null) { result.put(null, getMessageCount()); } else { - final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize(); + final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize(); int count = 0; try (LinkedListIterator iterator = queue.browserIterator()) { try { @@ -1552,13 +1556,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { Filter thefilter = FilterImpl.createFilter(filter); queue.flushExecutor(); + final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit(); try (LinkedListIterator iterator = queue.browserIterator()) { try { while (iterator.hasNext() && index < end) { MessageReference ref = iterator.next(); if (thefilter == null || thefilter.match(ref.getMessage())) { if (index >= start) { - c.add(OpenTypeSupport.convert(ref)); + c.add(OpenTypeSupport.convert(ref, attributeSizeLimit)); } } index++; @@ -1597,7 +1602,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { clearIO(); try { - int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize(); + final AddressSettings addressSettings = addressSettingsRepository.getMatch(address); + final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit(); + final int limit = addressSettings.getManagementBrowsePageSize(); int currentPageSize = 0; ArrayList c = new ArrayList<>(); Filter thefilter = FilterImpl.createFilter(filter); @@ -1607,7 +1614,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl { while (iterator.hasNext() && currentPageSize++ < limit) { MessageReference ref = iterator.next(); if (thefilter == null || thefilter.match(ref.getMessage())) { - c.add(OpenTypeSupport.convert(ref)); + c.add(OpenTypeSupport.convert(ref, attributeSizeLimit)); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java index 027bbdb608..7c5bd6502e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java @@ -34,6 +34,7 @@ import java.util.Map; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.MessageReference; @@ -46,7 +47,7 @@ public final class OpenTypeSupport { private OpenTypeSupport() { } - public static CompositeData convert(MessageReference ref) throws OpenDataException { + public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException { CompositeType ct; ICoreMessage message = ref.getMessage().toCore(); @@ -57,11 +58,11 @@ public final class OpenTypeSupport { switch(type) { case Message.TEXT_TYPE: ct = TEXT_FACTORY.getCompositeType(); - fields = TEXT_FACTORY.getFields(ref); + fields = TEXT_FACTORY.getFields(ref, valueSizeLimit); break; default: ct = BYTES_FACTORY.getCompositeType(); - fields = BYTES_FACTORY.getFields(ref); + fields = BYTES_FACTORY.getFields(ref, valueSizeLimit); break; } return new CompositeDataSupport(ct, fields); @@ -82,6 +83,7 @@ public final class OpenTypeSupport { protected TabularType longPropertyTabularType; protected TabularType floatPropertyTabularType; protected TabularType doublePropertyTabularType; + protected Object[][] typedPropertyFields; protected String getTypeName() { return Message.class.getName(); @@ -129,9 +131,21 @@ public final class OpenTypeSupport { addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType); addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType); addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType); + + typedPropertyFields = new Object[][] { + {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class}, + {CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class}, + {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class}, + {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class}, + {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class}, + {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class}, + {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class}, + {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class} + }; + } - public Map getFields(MessageReference ref) throws OpenDataException { + public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { Map rc = new HashMap<>(); ICoreMessage m = ref.getMessage().toCore(); rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID()); @@ -154,49 +168,23 @@ public final class OpenTypeSupport { rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1); } - Map propertyMap = m.toPropertyMap(); + Map propertyMap = m.toPropertyMap(valueSizeLimit); - rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap); + rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit)); - try { - rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(propertyMap, stringPropertyTabularType, String.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(propertyMap, booleanPropertyTabularType, Boolean.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(propertyMap, bytePropertyTabularType, Byte.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(propertyMap, shortPropertyTabularType, Short.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(propertyMap, intPropertyTabularType, Integer.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(propertyMap, longPropertyTabularType, Long.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(propertyMap, floatPropertyTabularType, Float.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType)); - } - try { - rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(propertyMap, doublePropertyTabularType, Double.class)); - } catch (IOException e) { - rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType)); + // only populate if there are some values + TabularDataSupport tabularData; + for (Object[] typedPropertyInfo : typedPropertyFields) { + tabularData = null; + try { + tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]); + } catch (Exception ignored) { + } + if (tabularData != null && !tabularData.isEmpty()) { + rc.put((String) typedPropertyInfo[0], tabularData); + } else { + rc.put((String) typedPropertyInfo[0], null); + } } return rc; } @@ -273,14 +261,14 @@ public final class OpenTypeSupport { } @Override - public Map getFields(MessageReference ref) throws OpenDataException { - Map rc = super.getFields(ref); + public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { + Map rc = super.getFields(ref, valueSizeLimit); ICoreMessage m = ref.getMessage().toCore(); if (!m.isLargeMessage()) { ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer(); - byte[] bytes = new byte[bodyCopy.readableBytes()]; + byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1]; bodyCopy.readBytes(bytes); - rc.put(CompositeDataConstants.BODY, bytes); + rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit)); } else { rc.put(CompositeDataConstants.BODY, new byte[0]); } @@ -298,15 +286,15 @@ public final class OpenTypeSupport { } @Override - public Map getFields(MessageReference ref) throws OpenDataException { - Map rc = super.getFields(ref); + public Map getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException { + Map rc = super.getFields(ref, valueSizeLimit); ICoreMessage m = ref.getMessage().toCore(); if (!m.isLargeMessage()) { if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) { rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]"); } else { - SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString(); - rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : ""); + final String text = m.getReadOnlyBodyBuffer().readString(); + rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text, valueSizeLimit) : ""); } } else { rc.put(CompositeDataConstants.TEXT_BODY, "[large message]"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index 567f679408..44aa2996b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -127,6 +127,8 @@ public class AddressSettings implements Mergeable, Serializable public static final boolean DEFAULT_ENABLE_METRICS = true; + public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256; + private AddressFullMessagePolicy addressFullMessagePolicy = null; private Long maxSizeBytes = null; @@ -253,6 +255,8 @@ public class AddressSettings implements Mergeable, Serializable private Boolean enableMetrics = null; + private Integer managementMessageAttributeSizeLimit = null; + //from amq5 //make it transient private transient Integer queuePrefetch = null; @@ -318,6 +322,7 @@ public class AddressSettings implements Mergeable, Serializable this.defaultGroupFirstKey = other.defaultGroupFirstKey; this.defaultRingSize = other.defaultRingSize; this.enableMetrics = other.enableMetrics; + this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit; } public AddressSettings() { @@ -914,6 +919,15 @@ public class AddressSettings implements Mergeable, Serializable return this; } + public int getManagementMessageAttributeSizeLimit() { + return managementMessageAttributeSizeLimit != null ? managementMessageAttributeSizeLimit : AddressSettings.MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT; + } + + public AddressSettings setManagementMessageAttributeSizeLimit(int managementMessageAttributeSizeLimit) { + this.managementMessageAttributeSizeLimit = managementMessageAttributeSizeLimit; + return this; + } + /** * merge 2 objects in to 1 * @@ -1029,6 +1043,9 @@ public class AddressSettings implements Mergeable, Serializable if (managementBrowsePageSize == null) { managementBrowsePageSize = merged.managementBrowsePageSize; } + if (managementMessageAttributeSizeLimit == null) { + managementMessageAttributeSizeLimit = merged.managementMessageAttributeSizeLimit; + } if (queuePrefetch == null) { queuePrefetch = merged.queuePrefetch; } @@ -1320,6 +1337,10 @@ public class AddressSettings implements Mergeable, Serializable defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer); } + if (buffer.readableBytes() > 0) { + managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer); + } + } @Override @@ -1383,7 +1404,8 @@ public class AddressSettings implements Mergeable, Serializable SimpleString.sizeofNullableString(expiryQueuePrefix) + SimpleString.sizeofNullableString(expiryQueueSuffix) + BufferHelper.sizeOfNullableBoolean(enableMetrics) + - BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch); + BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) + + BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit); } @Override @@ -1510,6 +1532,7 @@ public class AddressSettings implements Mergeable, Serializable BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch); + BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit); } /* (non-Javadoc) @@ -1581,6 +1604,7 @@ public class AddressSettings implements Mergeable, Serializable result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode()); result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode()); result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode()); + result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode()); return result; } @@ -1796,6 +1820,11 @@ public class AddressSettings implements Mergeable, Serializable return false; } else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize)) return false; + if (managementMessageAttributeSizeLimit == null) { + if (other.managementMessageAttributeSizeLimit != null) + return false; + } else if (!managementMessageAttributeSizeLimit.equals(other.managementMessageAttributeSizeLimit)) + return false; if (queuePrefetch == null) { if (other.queuePrefetch != null) return false; @@ -2017,6 +2046,8 @@ public class AddressSettings implements Mergeable, Serializable configDeleteAddresses + ", managementBrowsePageSize=" + managementBrowsePageSize + + ", managementMessageAttributeSizeLimit=" + + managementMessageAttributeSizeLimit + ", defaultMaxConsumers=" + defaultMaxConsumers + ", defaultPurgeOnNoConsumers=" + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java index e0ea8910d6..259d1bfba1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/CoreTransactionDetail.java @@ -56,6 +56,6 @@ public class CoreTransactionDetail extends TransactionDetail { @Override public Map decodeMessageProperties(Message msg) { - return msg.toMap(); + return msg.toMap(256); } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 6788727bb4..fed309a846 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -3804,7 +3804,15 @@ - how many message a management resource can browse + how many message a management resource can browse, list or filter + + + + + + + + the size limit of any message attribute value returned from a browse ,list or filter. Attribute values that exceed with be truncated diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml index 8d5560dbb4..b6ed421b3b 100644 --- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml @@ -505,6 +505,8 @@ 10000 10 false + 400 + 265 diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java index 128636a3a0..2429cc69e8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java @@ -39,6 +39,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.json.JsonArray; import javax.json.JsonObject; +import java.nio.charset.StandardCharsets; import java.util.Map; public class JMXManagementTest extends JMSClientTestSupport { @@ -107,8 +108,11 @@ public class JMXManagementTest extends JMSClientTestSupport { session.begin(); AmqpMessage message = new AmqpMessage(); message.setApplicationProperty("TEST_BINARY", new Binary("TEST".getBytes())); - message.setApplicationProperty("TEST_STRING", "TEST"); - message.setText("TEST"); + + final String oneK = new String(new char[1024]).replace("\0", "$"); + message.setApplicationProperty("TEST_BIG_BINARY", new Binary(oneK.getBytes(StandardCharsets.UTF_8))); + message.setApplicationProperty("TEST_STRING", oneK); + message.setText("NOT_VISIBLE"); sender.send(message); session.commit(); @@ -116,6 +120,18 @@ public class JMXManagementTest extends JMSClientTestSupport { QueueControl queueControl = createManagementControl(queue, queue); String firstMessageAsJSON = queueControl.getFirstMessageAsJSON(); Assert.assertNotNull(firstMessageAsJSON); + + // Json is still bulky! + Assert.assertTrue(firstMessageAsJSON.length() < 1500); + Assert.assertFalse(firstMessageAsJSON.contains("NOT_VISIBLE")); + + // composite data limits + Map[] result = queueControl.listMessages(""); + assertEquals(1, result.length); + + final Map msgMap = result[0]; + Assert.assertTrue(msgMap.get("TEST_STRING").toString().length() < 512); + } finally { connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index b04b845e49..457c379f96 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -488,6 +488,122 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + @Test + public void testMessageAttributeLimits() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(100); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + byte[] twoKBytes = new byte[2048]; + for (int i = 0; i < 2048; i++) { + twoKBytes[i] = '*'; + } + + String twoKString = new String(twoKBytes); + + ClientMessage clientMessage = session.createMessage(false); + + clientMessage.putStringProperty("y", "valueY"); + clientMessage.putStringProperty("bigString", twoKString); + clientMessage.putBytesProperty("bigBytes", twoKBytes); + clientMessage.putObjectProperty("bigObject", twoKString); + + clientMessage.getBodyBuffer().writeBytes(twoKBytes); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, getMessageCount(queueControl)); + + ClientProducer producer = session.createProducer(address); + producer.send(clientMessage); + + Wait.assertEquals(1, () -> getMessageCount(queueControl)); + + assertTrue(server.getPagingManager().getPageStore(address).getAddressSize() > 2048); + + Map[] messages = queueControl.listMessages(""); + assertEquals(1, messages.length); + for (String key : messages[0].keySet()) { + Object value = messages[0].get(key); + System.err.println( key + " " + value); + assertTrue(value.toString().length() <= 150); + + if (value instanceof byte[]) { + assertTrue(((byte[])value).length <= 150); + } + } + + String all = queueControl.listMessagesAsJSON(""); + assertTrue(all.length() < 1024); + + String first = queueControl.getFirstMessageAsJSON(); + assertTrue(first.length() < 1024); + + CompositeData[] browseResult = queueControl.browse(1, 100); + for (CompositeData compositeData : browseResult) { + for (String key : compositeData.getCompositeType().keySet()) { + Object value = compositeData.get(key); + System.err.println("" + key + ", " + value); + + if (value != null) { + + if (key.equals("StringProperties")) { + // these are very verbose composite data structures + assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 2048); + } else { + assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 512); + } + + if (value instanceof byte[]) { + assertTrue("truncated? " + key, ((byte[]) value).length <= 150); + } + } + } + } + + session.deleteQueue(queue); + } + + @Test + public void testTextMessageAttributeLimits() throws Exception { + SimpleString address = RandomUtil.randomSimpleString(); + SimpleString queue = RandomUtil.randomSimpleString(); + + AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(10); + server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings); + + session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable)); + + final String twentyBytes = new String(new char[20]).replace("\0", "#"); + + ClientMessage clientMessage = createTextMessage(session, twentyBytes, true); + clientMessage.putStringProperty("x", twentyBytes); + + QueueControl queueControl = createManagementControl(address, queue); + Assert.assertEquals(0, getMessageCount(queueControl)); + + ClientProducer producer = session.createProducer(address); + producer.send(clientMessage); + + Wait.assertEquals(1, () -> getMessageCount(queueControl)); + + Map[] messages = queueControl.listMessages(""); + assertEquals(1, messages.length); + assertTrue("truncated? ", ((String)messages[0].get("x")).contains("more")); + + CompositeData[] browseResult = queueControl.browse(1, 100); + for (CompositeData compositeData : browseResult) { + for (String key : new String[] {"text", "PropertiesText", "StringProperties"}) { + assertTrue("truncated? : " + key, compositeData.get(key).toString().contains("more")); + } + } + + session.deleteQueue(queue); + } + @Test public void testGetMessagesAdded() throws Exception { SimpleString address = RandomUtil.randomSimpleString(); @@ -1082,7 +1198,7 @@ public class QueueControlTest extends ManagementTestBase { QueueControl queueControl = createManagementControl(address, queue); ClientProducer producer = session.createProducer(address); - producer.send(session.createMessage(durable)); + producer.send(session.createMessage(durable).putBytesProperty("bytes", new byte[]{'%'})); producer.send(session.createMessage(durable)); Wait.assertEquals(2, () -> queueControl.listMessages(null).length);