ARTEMIS-3175 - implement address setting management-message-attribute-size-limit to sensibly limit data returned by list/browse/filter management ops
This commit is contained in:
parent
7c5ef2796a
commit
02bb7031c2
|
@ -102,10 +102,11 @@ public interface ICoreMessage extends Message {
|
|||
|
||||
/**
|
||||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
* @param valueSizeLimit
|
||||
*/
|
||||
@Override
|
||||
default Map<String, Object> toMap() {
|
||||
Map map = toPropertyMap();
|
||||
default Map<String, Object> toMap(int valueSizeLimit) {
|
||||
Map map = toPropertyMap(valueSizeLimit);
|
||||
map.put("messageID", getMessageID());
|
||||
Object userID = getUserID();
|
||||
if (getUserID() != null) {
|
||||
|
|
|
@ -28,6 +28,7 @@ import javax.management.openmbean.CompositeDataSupport;
|
|||
import java.io.ByteArrayInputStream;
|
||||
import java.io.StringReader;
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -45,53 +46,8 @@ public final class JsonUtil {
|
|||
JsonArrayBuilder jsonArray = JsonLoader.createArrayBuilder();
|
||||
|
||||
for (Object parameter : array) {
|
||||
if (parameter instanceof Map) {
|
||||
Map<String, Object> map = (Map<String, Object>) parameter;
|
||||
|
||||
JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
|
||||
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
|
||||
Object val = entry.getValue();
|
||||
|
||||
if (val != null) {
|
||||
if (val.getClass().isArray()) {
|
||||
JsonArray objectArray = toJSONArray((Object[]) val);
|
||||
jsonObject.add(key, objectArray);
|
||||
} else {
|
||||
addToObject(key, val, jsonObject);
|
||||
}
|
||||
}
|
||||
}
|
||||
jsonArray.add(jsonObject);
|
||||
} else {
|
||||
if (parameter != null) {
|
||||
Class<?> clz = parameter.getClass();
|
||||
|
||||
if (clz.isArray()) {
|
||||
Object[] innerArray = (Object[]) parameter;
|
||||
|
||||
if (innerArray instanceof CompositeData[]) {
|
||||
JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder();
|
||||
for (Object data : innerArray) {
|
||||
String s = Base64.encodeObject((CompositeDataSupport) data);
|
||||
innerJsonArray.add(s);
|
||||
}
|
||||
JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
|
||||
jsonObject.add(CompositeData.class.getName(), innerJsonArray);
|
||||
jsonArray.add(jsonObject);
|
||||
} else {
|
||||
jsonArray.add(toJSONArray(innerArray));
|
||||
}
|
||||
} else {
|
||||
addToArray(parameter, jsonArray);
|
||||
}
|
||||
} else {
|
||||
jsonArray.addNull();
|
||||
}
|
||||
}
|
||||
}
|
||||
return jsonArray.build();
|
||||
}
|
||||
|
||||
|
@ -210,6 +166,12 @@ public final class JsonUtil {
|
|||
} else if (param instanceof byte[]) {
|
||||
JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param);
|
||||
jsonObjectBuilder.add(key, byteArrayObject);
|
||||
} else if (param instanceof Object[]) {
|
||||
final JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder();
|
||||
for (Object parameter : (Object[])param) {
|
||||
addToArray(parameter, objectArrayBuilder);
|
||||
}
|
||||
jsonObjectBuilder.add(key, objectArrayBuilder);
|
||||
} else {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName());
|
||||
}
|
||||
|
@ -238,6 +200,21 @@ public final class JsonUtil {
|
|||
} else if (param instanceof byte[]) {
|
||||
JsonArrayBuilder byteArrayObject = toJsonArrayBuilder((byte[]) param);
|
||||
jsonArrayBuilder.add(byteArrayObject);
|
||||
} else if (param instanceof CompositeData[]) {
|
||||
JsonArrayBuilder innerJsonArray = JsonLoader.createArrayBuilder();
|
||||
for (Object data : (CompositeData[])param) {
|
||||
String s = Base64.encodeObject((CompositeDataSupport) data);
|
||||
innerJsonArray.add(s);
|
||||
}
|
||||
JsonObjectBuilder jsonObject = JsonLoader.createObjectBuilder();
|
||||
jsonObject.add(CompositeData.class.getName(), innerJsonArray);
|
||||
jsonArrayBuilder.add(jsonObject);
|
||||
} else if (param instanceof Object[]) {
|
||||
JsonArrayBuilder objectArrayBuilder = JsonLoader.createArrayBuilder();
|
||||
for (Object parameter : (Object[])param) {
|
||||
addToArray(parameter, objectArrayBuilder);
|
||||
}
|
||||
jsonArrayBuilder.add(objectArrayBuilder);
|
||||
} else {
|
||||
throw ActiveMQClientMessageBundle.BUNDLE.invalidManagementParam(param.getClass().getName());
|
||||
}
|
||||
|
@ -345,6 +322,29 @@ public final class JsonUtil {
|
|||
private JsonUtil() {
|
||||
}
|
||||
|
||||
public static Object truncate(final Object value, final int valueSizeLimit) {
|
||||
Object result = value;
|
||||
if (valueSizeLimit >= 0) {
|
||||
if (String.class.equals(value.getClass())) {
|
||||
String str = (String) value;
|
||||
if (str.length() > valueSizeLimit) {
|
||||
result = new StringBuilder(valueSizeLimit + 32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append(" more").toString();
|
||||
}
|
||||
} else if (value.getClass().isArray()) {
|
||||
if (byte[].class.equals(value.getClass())) {
|
||||
if (((byte[]) value).length > valueSizeLimit) {
|
||||
result = Arrays.copyOfRange((byte[]) value, 0, valueSizeLimit);
|
||||
}
|
||||
} else if (char[].class.equals(value.getClass())) {
|
||||
if (((char[]) value).length > valueSizeLimit) {
|
||||
result = Arrays.copyOfRange((char[]) value, 0, valueSizeLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static class NullableJsonString implements JsonValue, JsonString {
|
||||
|
||||
private final String value;
|
||||
|
|
|
@ -708,7 +708,15 @@ public interface Message {
|
|||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
*/
|
||||
default Map<String, Object> toMap() {
|
||||
Map map = toPropertyMap();
|
||||
return toMap(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the message in Map form, useful when encoding to JSON
|
||||
* @param valueSizeLimit that limits [] map values
|
||||
*/
|
||||
default Map<String, Object> toMap(int valueSizeLimit) {
|
||||
Map map = toPropertyMap(valueSizeLimit);
|
||||
map.put("messageID", getMessageID());
|
||||
Object userID = getUserID();
|
||||
if (getUserID() != null) {
|
||||
|
@ -728,6 +736,14 @@ public interface Message {
|
|||
* @return Returns the message properties in Map form, useful when encoding to JSON
|
||||
*/
|
||||
default Map<String, Object> toPropertyMap() {
|
||||
return toPropertyMap(-1);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the message properties in Map form, useful when encoding to JSON
|
||||
* @param valueSizeLimit that limits [] map values
|
||||
*/
|
||||
default Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
||||
Map map = new HashMap<>();
|
||||
for (SimpleString name : getPropertyNames()) {
|
||||
Object value = getObjectProperty(name.toString());
|
||||
|
@ -735,12 +751,12 @@ public interface Message {
|
|||
if (value instanceof SimpleString) {
|
||||
value = value.toString();
|
||||
}
|
||||
value = JsonUtil.truncate(value, valueSizeLimit);
|
||||
map.put(name.toString(), value);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
|
||||
/** This should make you convert your message into Core format. */
|
||||
ICoreMessage toCore();
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.activemq.artemis.message;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
@ -211,6 +212,32 @@ public class CoreMessageTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testToMapLimit() throws Exception {
|
||||
|
||||
CoreMessage coreMessage = new CoreMessage().initBuffer(BIGGER_TEXT.length() * 2);
|
||||
coreMessage.putStringProperty("prop", BIGGER_TEXT);
|
||||
coreMessage.putBytesProperty("bytesProp", BIGGER_TEXT.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
Assert.assertEquals(BIGGER_TEXT.length(), ((String)coreMessage.toMap().get("prop")).length());
|
||||
Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap().get("bytesProp")).length);
|
||||
|
||||
// limit the values
|
||||
Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toMap(40).get("bytesProp")).length);
|
||||
String mapVal = ((String)coreMessage.toMap(40).get("prop"));
|
||||
Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length());
|
||||
Assert.assertTrue(mapVal.contains("more"));
|
||||
|
||||
mapVal = ((String)coreMessage.toMap(0).get("prop"));
|
||||
Assert.assertNotEquals(BIGGER_TEXT.length(), mapVal.length());
|
||||
Assert.assertTrue(mapVal.contains("more"));
|
||||
|
||||
Assert.assertEquals(BIGGER_TEXT.length(), Integer.parseInt(mapVal.substring(mapVal.lastIndexOf('+') + 1, mapVal.lastIndexOf('m')).trim()));
|
||||
|
||||
Assert.assertEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap().get("bytesProp")).length);
|
||||
Assert.assertNotEquals(BIGGER_TEXT.getBytes(StandardCharsets.UTF_8).length, ((byte[])coreMessage.toPropertyMap(40).get("bytesProp")).length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveReceiveLimitedBytes() {
|
||||
CoreMessage empty = new CoreMessage().initBuffer(100);
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.RefCountMessage;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -835,7 +836,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
public abstract int getMemoryEstimate();
|
||||
|
||||
@Override
|
||||
public Map<String, Object> toPropertyMap() {
|
||||
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
|
||||
Map map = new HashMap<>();
|
||||
for (SimpleString name : getPropertyNames()) {
|
||||
Object value = getObjectProperty(name.toString());
|
||||
|
@ -843,6 +844,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
if (value instanceof Binary) {
|
||||
value = ((Binary)value).getArray();
|
||||
}
|
||||
value = JsonUtil.truncate(value, valueSizeLimit);
|
||||
map.put(name.toString(), value);
|
||||
}
|
||||
return map;
|
||||
|
|
|
@ -281,6 +281,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
private static final String MANAGEMENT_BROWSE_PAGE_SIZE = "management-browse-page-size";
|
||||
|
||||
private static final String MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = "management-message-attribute-size-limit";
|
||||
|
||||
private static final String MAX_CONNECTIONS_NODE_NAME = "max-connections";
|
||||
|
||||
private static final String MAX_QUEUES_NODE_NAME = "max-queues";
|
||||
|
@ -1250,6 +1252,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
addressSettings.setConfigDeleteAddresses(policy);
|
||||
} else if (MANAGEMENT_BROWSE_PAGE_SIZE.equalsIgnoreCase(name)) {
|
||||
addressSettings.setManagementBrowsePageSize(XMLUtil.parseInt(child));
|
||||
} else if (MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT.equalsIgnoreCase(name)) {
|
||||
addressSettings.setManagementMessageAttributeSizeLimit(XMLUtil.parseInt(child));
|
||||
} else if (DEFAULT_PURGE_ON_NO_CONSUMERS.equalsIgnoreCase(name)) {
|
||||
addressSettings.setDefaultPurgeOnNoConsumers(XMLUtil.parseBoolean(child));
|
||||
} else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) {
|
||||
|
|
|
@ -788,11 +788,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
* @return
|
||||
*/
|
||||
private Map<String, Object>[] convertMessagesToMaps(List<MessageReference> refs) throws ActiveMQException {
|
||||
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
|
||||
Map<String, Object>[] messages = new Map[refs.size()];
|
||||
int i = 0;
|
||||
for (MessageReference ref : refs) {
|
||||
Message message = ref.getMessage();
|
||||
messages[i++] = message.toMap();
|
||||
messages[i++] = message.toMap(attributeSizeLimit);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
@ -847,7 +848,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
Filter filter = FilterImpl.createFilter(filterStr);
|
||||
List<Map<String, Object>> messages = new ArrayList<>();
|
||||
queue.flushExecutor();
|
||||
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
|
||||
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
|
||||
final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
|
||||
final int limit = addressSettings.getManagementBrowsePageSize();
|
||||
int count = 0;
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||
try {
|
||||
|
@ -855,7 +858,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
MessageReference ref = iterator.next();
|
||||
if (filter == null || filter.match(ref.getMessage())) {
|
||||
Message message = ref.getMessage();
|
||||
messages.add(message.toMap());
|
||||
messages.add(message.toMap(attributeSizeLimit));
|
||||
}
|
||||
}
|
||||
} catch (NoSuchElementException ignored) {
|
||||
|
@ -895,12 +898,13 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
try {
|
||||
List<Map<String, Object>> messages = new ArrayList<>();
|
||||
queue.flushExecutor();
|
||||
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||
// returns just the first, as it's the first only
|
||||
if (iterator.hasNext()) {
|
||||
MessageReference ref = iterator.next();
|
||||
Message message = ref.getMessage();
|
||||
messages.add(message.toMap());
|
||||
messages.add(message.toMap(attributeSizeLimit));
|
||||
}
|
||||
return messages.toArray(new Map[1]);
|
||||
}
|
||||
|
@ -985,7 +989,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
if (filter == null && groupByProperty == null) {
|
||||
result.put(null, getMessageCount());
|
||||
} else {
|
||||
final int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
|
||||
final int limit = addressSettingsRepository.getMatch(address).getManagementBrowsePageSize();
|
||||
int count = 0;
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||
try {
|
||||
|
@ -1552,13 +1556,14 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
Filter thefilter = FilterImpl.createFilter(filter);
|
||||
queue.flushExecutor();
|
||||
|
||||
final int attributeSizeLimit = addressSettingsRepository.getMatch(address).getManagementMessageAttributeSizeLimit();
|
||||
try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
|
||||
try {
|
||||
while (iterator.hasNext() && index < end) {
|
||||
MessageReference ref = iterator.next();
|
||||
if (thefilter == null || thefilter.match(ref.getMessage())) {
|
||||
if (index >= start) {
|
||||
c.add(OpenTypeSupport.convert(ref));
|
||||
c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
|
||||
}
|
||||
}
|
||||
index++;
|
||||
|
@ -1597,7 +1602,9 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
|
||||
clearIO();
|
||||
try {
|
||||
int limit = addressSettingsRepository.getMatch(queue.getAddress().toString()).getManagementBrowsePageSize();
|
||||
final AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
|
||||
final int attributeSizeLimit = addressSettings.getManagementMessageAttributeSizeLimit();
|
||||
final int limit = addressSettings.getManagementBrowsePageSize();
|
||||
int currentPageSize = 0;
|
||||
ArrayList<CompositeData> c = new ArrayList<>();
|
||||
Filter thefilter = FilterImpl.createFilter(filter);
|
||||
|
@ -1607,7 +1614,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
|||
while (iterator.hasNext() && currentPageSize++ < limit) {
|
||||
MessageReference ref = iterator.next();
|
||||
if (thefilter == null || thefilter.match(ref.getMessage())) {
|
||||
c.add(OpenTypeSupport.convert(ref));
|
||||
c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Map;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
|
@ -46,7 +47,7 @@ public final class OpenTypeSupport {
|
|||
private OpenTypeSupport() {
|
||||
}
|
||||
|
||||
public static CompositeData convert(MessageReference ref) throws OpenDataException {
|
||||
public static CompositeData convert(MessageReference ref, int valueSizeLimit) throws OpenDataException {
|
||||
CompositeType ct;
|
||||
|
||||
ICoreMessage message = ref.getMessage().toCore();
|
||||
|
@ -57,11 +58,11 @@ public final class OpenTypeSupport {
|
|||
switch(type) {
|
||||
case Message.TEXT_TYPE:
|
||||
ct = TEXT_FACTORY.getCompositeType();
|
||||
fields = TEXT_FACTORY.getFields(ref);
|
||||
fields = TEXT_FACTORY.getFields(ref, valueSizeLimit);
|
||||
break;
|
||||
default:
|
||||
ct = BYTES_FACTORY.getCompositeType();
|
||||
fields = BYTES_FACTORY.getFields(ref);
|
||||
fields = BYTES_FACTORY.getFields(ref, valueSizeLimit);
|
||||
break;
|
||||
}
|
||||
return new CompositeDataSupport(ct, fields);
|
||||
|
@ -82,6 +83,7 @@ public final class OpenTypeSupport {
|
|||
protected TabularType longPropertyTabularType;
|
||||
protected TabularType floatPropertyTabularType;
|
||||
protected TabularType doublePropertyTabularType;
|
||||
protected Object[][] typedPropertyFields;
|
||||
|
||||
protected String getTypeName() {
|
||||
return Message.class.getName();
|
||||
|
@ -129,9 +131,21 @@ public final class OpenTypeSupport {
|
|||
addItem(CompositeDataConstants.LONG_PROPERTIES, CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
|
||||
addItem(CompositeDataConstants.FLOAT_PROPERTIES, CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
|
||||
addItem(CompositeDataConstants.DOUBLE_PROPERTIES, CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION, doublePropertyTabularType);
|
||||
|
||||
typedPropertyFields = new Object[][] {
|
||||
{CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType, String.class},
|
||||
{CompositeDataConstants.BOOLEAN_PROPERTIES, booleanPropertyTabularType, Boolean.class},
|
||||
{CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType, Byte.class},
|
||||
{CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType, Short.class},
|
||||
{CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType, Integer.class},
|
||||
{CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType, Long.class},
|
||||
{CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType, Float.class},
|
||||
{CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType, Double.class}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
|
||||
Map<String, Object> rc = new HashMap<>();
|
||||
ICoreMessage m = ref.getMessage().toCore();
|
||||
rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
|
||||
|
@ -154,49 +168,23 @@ public final class OpenTypeSupport {
|
|||
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
|
||||
}
|
||||
|
||||
Map<String, Object> propertyMap = m.toPropertyMap();
|
||||
Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
|
||||
|
||||
rc.put(CompositeDataConstants.PROPERTIES, "" + propertyMap);
|
||||
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" + propertyMap, valueSizeLimit));
|
||||
|
||||
// only populate if there are some values
|
||||
TabularDataSupport tabularData;
|
||||
for (Object[] typedPropertyInfo : typedPropertyFields) {
|
||||
tabularData = null;
|
||||
try {
|
||||
rc.put(CompositeDataConstants.STRING_PROPERTIES, createTabularData(propertyMap, stringPropertyTabularType, String.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.STRING_PROPERTIES, new TabularDataSupport(stringPropertyTabularType));
|
||||
tabularData = createTabularData(propertyMap, (TabularType) typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, createTabularData(propertyMap, booleanPropertyTabularType, Boolean.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.BOOLEAN_PROPERTIES, new TabularDataSupport(booleanPropertyTabularType));
|
||||
if (tabularData != null && !tabularData.isEmpty()) {
|
||||
rc.put((String) typedPropertyInfo[0], tabularData);
|
||||
} else {
|
||||
rc.put((String) typedPropertyInfo[0], null);
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.BYTE_PROPERTIES, createTabularData(propertyMap, bytePropertyTabularType, Byte.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.BYTE_PROPERTIES, new TabularDataSupport(bytePropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.SHORT_PROPERTIES, createTabularData(propertyMap, shortPropertyTabularType, Short.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.SHORT_PROPERTIES, new TabularDataSupport(shortPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.INT_PROPERTIES, createTabularData(propertyMap, intPropertyTabularType, Integer.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.INT_PROPERTIES, new TabularDataSupport(intPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.LONG_PROPERTIES, createTabularData(propertyMap, longPropertyTabularType, Long.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.LONG_PROPERTIES, new TabularDataSupport(longPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, createTabularData(propertyMap, floatPropertyTabularType, Float.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.FLOAT_PROPERTIES, new TabularDataSupport(floatPropertyTabularType));
|
||||
}
|
||||
try {
|
||||
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, createTabularData(propertyMap, doublePropertyTabularType, Double.class));
|
||||
} catch (IOException e) {
|
||||
rc.put(CompositeDataConstants.DOUBLE_PROPERTIES, new TabularDataSupport(doublePropertyTabularType));
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
@ -273,14 +261,14 @@ public final class OpenTypeSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref);
|
||||
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
|
||||
ICoreMessage m = ref.getMessage().toCore();
|
||||
if (!m.isLargeMessage()) {
|
||||
ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes()];
|
||||
byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit ? bodyCopy.readableBytes() : valueSizeLimit + 1];
|
||||
bodyCopy.readBytes(bytes);
|
||||
rc.put(CompositeDataConstants.BODY, bytes);
|
||||
rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes, valueSizeLimit));
|
||||
} else {
|
||||
rc.put(CompositeDataConstants.BODY, new byte[0]);
|
||||
}
|
||||
|
@ -298,15 +286,15 @@ public final class OpenTypeSupport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getFields(MessageReference ref) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref);
|
||||
public Map<String, Object> getFields(MessageReference ref, int valueSizeLimit) throws OpenDataException {
|
||||
Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
|
||||
ICoreMessage m = ref.getMessage().toCore();
|
||||
if (!m.isLargeMessage()) {
|
||||
if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
|
||||
} else {
|
||||
SimpleString text = m.getReadOnlyBodyBuffer().readNullableSimpleString();
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? text.toString() : "");
|
||||
final String text = m.getReadOnlyBodyBuffer().readString();
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, text != null ? JsonUtil.truncate(text, valueSizeLimit) : "");
|
||||
}
|
||||
} else {
|
||||
rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
|
||||
|
|
|
@ -127,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
public static final boolean DEFAULT_ENABLE_METRICS = true;
|
||||
|
||||
public static final int MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT = 256;
|
||||
|
||||
private AddressFullMessagePolicy addressFullMessagePolicy = null;
|
||||
|
||||
private Long maxSizeBytes = null;
|
||||
|
@ -253,6 +255,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
private Boolean enableMetrics = null;
|
||||
|
||||
private Integer managementMessageAttributeSizeLimit = null;
|
||||
|
||||
//from amq5
|
||||
//make it transient
|
||||
private transient Integer queuePrefetch = null;
|
||||
|
@ -318,6 +322,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
this.defaultGroupFirstKey = other.defaultGroupFirstKey;
|
||||
this.defaultRingSize = other.defaultRingSize;
|
||||
this.enableMetrics = other.enableMetrics;
|
||||
this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
|
||||
}
|
||||
|
||||
public AddressSettings() {
|
||||
|
@ -914,6 +919,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return this;
|
||||
}
|
||||
|
||||
public int getManagementMessageAttributeSizeLimit() {
|
||||
return managementMessageAttributeSizeLimit != null ? managementMessageAttributeSizeLimit : AddressSettings.MANAGEMENT_MESSAGE_ATTRIBUTE_SIZE_LIMIT;
|
||||
}
|
||||
|
||||
public AddressSettings setManagementMessageAttributeSizeLimit(int managementMessageAttributeSizeLimit) {
|
||||
this.managementMessageAttributeSizeLimit = managementMessageAttributeSizeLimit;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* merge 2 objects in to 1
|
||||
*
|
||||
|
@ -1029,6 +1043,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
if (managementBrowsePageSize == null) {
|
||||
managementBrowsePageSize = merged.managementBrowsePageSize;
|
||||
}
|
||||
if (managementMessageAttributeSizeLimit == null) {
|
||||
managementMessageAttributeSizeLimit = merged.managementMessageAttributeSizeLimit;
|
||||
}
|
||||
if (queuePrefetch == null) {
|
||||
queuePrefetch = merged.queuePrefetch;
|
||||
}
|
||||
|
@ -1320,6 +1337,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
defaultGroupRebalancePauseDispatch = BufferHelper.readNullableBoolean(buffer);
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() > 0) {
|
||||
managementMessageAttributeSizeLimit = BufferHelper.readNullableInteger(buffer);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1383,7 +1404,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
SimpleString.sizeofNullableString(expiryQueuePrefix) +
|
||||
SimpleString.sizeofNullableString(expiryQueueSuffix) +
|
||||
BufferHelper.sizeOfNullableBoolean(enableMetrics) +
|
||||
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch);
|
||||
BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
|
||||
BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1510,6 +1532,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
|
||||
BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalancePauseDispatch);
|
||||
|
||||
BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
|
@ -1581,6 +1604,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
result = prime * result + ((expiryQueuePrefix == null) ? 0 : expiryQueuePrefix.hashCode());
|
||||
result = prime * result + ((expiryQueueSuffix == null) ? 0 : expiryQueueSuffix.hashCode());
|
||||
result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
|
||||
result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -1796,6 +1820,11 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
return false;
|
||||
} else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
|
||||
return false;
|
||||
if (managementMessageAttributeSizeLimit == null) {
|
||||
if (other.managementMessageAttributeSizeLimit != null)
|
||||
return false;
|
||||
} else if (!managementMessageAttributeSizeLimit.equals(other.managementMessageAttributeSizeLimit))
|
||||
return false;
|
||||
if (queuePrefetch == null) {
|
||||
if (other.queuePrefetch != null)
|
||||
return false;
|
||||
|
@ -2017,6 +2046,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
|
|||
configDeleteAddresses +
|
||||
", managementBrowsePageSize=" +
|
||||
managementBrowsePageSize +
|
||||
", managementMessageAttributeSizeLimit=" +
|
||||
managementMessageAttributeSizeLimit +
|
||||
", defaultMaxConsumers=" +
|
||||
defaultMaxConsumers +
|
||||
", defaultPurgeOnNoConsumers=" +
|
||||
|
|
|
@ -56,6 +56,6 @@ public class CoreTransactionDetail extends TransactionDetail {
|
|||
|
||||
@Override
|
||||
public Map<String, Object> decodeMessageProperties(Message msg) {
|
||||
return msg.toMap();
|
||||
return msg.toMap(256);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3804,7 +3804,15 @@
|
|||
<xsd:element name="management-browse-page-size" type="xsd:int" default="200" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how many message a management resource can browse
|
||||
how many message a management resource can browse, list or filter
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="management-message-attribute-size-limit" type="xsd:int" default="256" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
the size limit of any message attribute value returned from a browse ,list or filter. Attribute values that exceed with be truncated
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
|
|
@ -505,6 +505,8 @@
|
|||
<default-consumer-window-size>10000</default-consumer-window-size>
|
||||
<retroactive-message-count>10</retroactive-message-count>
|
||||
<enable-metrics>false</enable-metrics>
|
||||
<management-browse-page-size>400</management-browse-page-size>
|
||||
<management-message-attribute-size-limit>265</management-message-attribute-size-limit>
|
||||
</address-setting>
|
||||
</address-settings>
|
||||
<resource-limit-settings>
|
||||
|
|
|
@ -39,6 +39,7 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
import javax.json.JsonArray;
|
||||
import javax.json.JsonObject;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
||||
public class JMXManagementTest extends JMSClientTestSupport {
|
||||
|
@ -107,8 +108,11 @@ public class JMXManagementTest extends JMSClientTestSupport {
|
|||
session.begin();
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setApplicationProperty("TEST_BINARY", new Binary("TEST".getBytes()));
|
||||
message.setApplicationProperty("TEST_STRING", "TEST");
|
||||
message.setText("TEST");
|
||||
|
||||
final String oneK = new String(new char[1024]).replace("\0", "$");
|
||||
message.setApplicationProperty("TEST_BIG_BINARY", new Binary(oneK.getBytes(StandardCharsets.UTF_8)));
|
||||
message.setApplicationProperty("TEST_STRING", oneK);
|
||||
message.setText("NOT_VISIBLE");
|
||||
sender.send(message);
|
||||
session.commit();
|
||||
|
||||
|
@ -116,6 +120,18 @@ public class JMXManagementTest extends JMSClientTestSupport {
|
|||
QueueControl queueControl = createManagementControl(queue, queue);
|
||||
String firstMessageAsJSON = queueControl.getFirstMessageAsJSON();
|
||||
Assert.assertNotNull(firstMessageAsJSON);
|
||||
|
||||
// Json is still bulky!
|
||||
Assert.assertTrue(firstMessageAsJSON.length() < 1500);
|
||||
Assert.assertFalse(firstMessageAsJSON.contains("NOT_VISIBLE"));
|
||||
|
||||
// composite data limits
|
||||
Map<String, Object>[] result = queueControl.listMessages("");
|
||||
assertEquals(1, result.length);
|
||||
|
||||
final Map<String, Object> msgMap = result[0];
|
||||
Assert.assertTrue(msgMap.get("TEST_STRING").toString().length() < 512);
|
||||
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
|
|
@ -488,6 +488,122 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMessageAttributeLimits() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(100);
|
||||
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
|
||||
|
||||
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
|
||||
|
||||
byte[] twoKBytes = new byte[2048];
|
||||
for (int i = 0; i < 2048; i++) {
|
||||
twoKBytes[i] = '*';
|
||||
}
|
||||
|
||||
String twoKString = new String(twoKBytes);
|
||||
|
||||
ClientMessage clientMessage = session.createMessage(false);
|
||||
|
||||
clientMessage.putStringProperty("y", "valueY");
|
||||
clientMessage.putStringProperty("bigString", twoKString);
|
||||
clientMessage.putBytesProperty("bigBytes", twoKBytes);
|
||||
clientMessage.putObjectProperty("bigObject", twoKString);
|
||||
|
||||
clientMessage.getBodyBuffer().writeBytes(twoKBytes);
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
producer.send(clientMessage);
|
||||
|
||||
Wait.assertEquals(1, () -> getMessageCount(queueControl));
|
||||
|
||||
assertTrue(server.getPagingManager().getPageStore(address).getAddressSize() > 2048);
|
||||
|
||||
Map<String, Object>[] messages = queueControl.listMessages("");
|
||||
assertEquals(1, messages.length);
|
||||
for (String key : messages[0].keySet()) {
|
||||
Object value = messages[0].get(key);
|
||||
System.err.println( key + " " + value);
|
||||
assertTrue(value.toString().length() <= 150);
|
||||
|
||||
if (value instanceof byte[]) {
|
||||
assertTrue(((byte[])value).length <= 150);
|
||||
}
|
||||
}
|
||||
|
||||
String all = queueControl.listMessagesAsJSON("");
|
||||
assertTrue(all.length() < 1024);
|
||||
|
||||
String first = queueControl.getFirstMessageAsJSON();
|
||||
assertTrue(first.length() < 1024);
|
||||
|
||||
CompositeData[] browseResult = queueControl.browse(1, 100);
|
||||
for (CompositeData compositeData : browseResult) {
|
||||
for (String key : compositeData.getCompositeType().keySet()) {
|
||||
Object value = compositeData.get(key);
|
||||
System.err.println("" + key + ", " + value);
|
||||
|
||||
if (value != null) {
|
||||
|
||||
if (key.equals("StringProperties")) {
|
||||
// these are very verbose composite data structures
|
||||
assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 2048);
|
||||
} else {
|
||||
assertTrue(value.toString().length() + " truncated? " + key, value.toString().length() <= 512);
|
||||
}
|
||||
|
||||
if (value instanceof byte[]) {
|
||||
assertTrue("truncated? " + key, ((byte[]) value).length <= 150);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTextMessageAttributeLimits() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
SimpleString queue = RandomUtil.randomSimpleString();
|
||||
|
||||
AddressSettings addressSettings = new AddressSettings().setManagementMessageAttributeSizeLimit(10);
|
||||
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
|
||||
|
||||
session.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(durable));
|
||||
|
||||
final String twentyBytes = new String(new char[20]).replace("\0", "#");
|
||||
|
||||
ClientMessage clientMessage = createTextMessage(session, twentyBytes, true);
|
||||
clientMessage.putStringProperty("x", twentyBytes);
|
||||
|
||||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
Assert.assertEquals(0, getMessageCount(queueControl));
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
producer.send(clientMessage);
|
||||
|
||||
Wait.assertEquals(1, () -> getMessageCount(queueControl));
|
||||
|
||||
Map<String, Object>[] messages = queueControl.listMessages("");
|
||||
assertEquals(1, messages.length);
|
||||
assertTrue("truncated? ", ((String)messages[0].get("x")).contains("more"));
|
||||
|
||||
CompositeData[] browseResult = queueControl.browse(1, 100);
|
||||
for (CompositeData compositeData : browseResult) {
|
||||
for (String key : new String[] {"text", "PropertiesText", "StringProperties"}) {
|
||||
assertTrue("truncated? : " + key, compositeData.get(key).toString().contains("more"));
|
||||
}
|
||||
}
|
||||
|
||||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetMessagesAdded() throws Exception {
|
||||
SimpleString address = RandomUtil.randomSimpleString();
|
||||
|
@ -1082,7 +1198,7 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
QueueControl queueControl = createManagementControl(address, queue);
|
||||
|
||||
ClientProducer producer = session.createProducer(address);
|
||||
producer.send(session.createMessage(durable));
|
||||
producer.send(session.createMessage(durable).putBytesProperty("bytes", new byte[]{'%'}));
|
||||
producer.send(session.createMessage(durable));
|
||||
|
||||
Wait.assertEquals(2, () -> queueControl.listMessages(null).length);
|
||||
|
|
Loading…
Reference in New Issue